Multi-Agent Systems and Orchestration
Multi-Agent Systems and Orchestration
Complex problems often benefit from specialized agents working together. Multi-agent systems decompose tasks across multiple specialized agents, each with distinct roles and capabilities. This lesson explores how to design, orchestrate, and coordinate multiple agents using frameworks like LangGraph and CrewAI.
Why Multi-Agent Architectures
Single agents face inherent limitations. Multiple agents provide:
- Specialization: Each agent focuses on specific domain expertise
- Parallelization: Multiple agents work simultaneously on different subtasks
- Resilience: If one agent fails, others continue operating
- Scalability: Add agents to handle increased complexity
- Interpretability: Understand system behavior by observing agent interactions
For example, a customer service system might have agents for:
- Intent classification
- Knowledge base retrieval
- Sentiment analysis
- Response generation
- Policy compliance checking
Multi-Agent Communication Patterns
Agent Roles and Communication
Each agent should have a clear role and know how to communicate with others:
from dataclasses import dataclass
from typing import Optional, List
import anthropic
@dataclass
class Agent:
"""Represents an agent in the system."""
name: str
role: str
description: str
system_prompt: str
def __repr__(self):
return f"{self.name} ({self.role}): {self.description}"
# Define specialized agents
research_agent = Agent(
name="Research Agent",
role="researcher",
description="Gathers and synthesizes information from multiple sources",
system_prompt="""You are a research specialist. Your task is to:
1. Ask clarifying questions to understand information needs
2. Gather relevant information from your knowledge base
3. Synthesize findings into clear summaries
4. Identify gaps in information and suggest next steps"""
)
analysis_agent = Agent(
name="Analysis Agent",
role="analyst",
description="Analyzes research findings and identifies patterns",
system_prompt="""You are a data analyst. Your task is to:
1. Receive research findings from the researcher
2. Identify key patterns and trends
3. Calculate relevant statistics
4. Highlight insights and anomalies
5. Prepare data for visualization"""
)
recommendation_agent = Agent(
name="Recommendation Agent",
role="recommender",
description="Provides actionable recommendations based on analysis",
system_prompt="""You are a strategic advisor. Your task is to:
1. Review analytical findings
2. Consider context and constraints
3. Generate actionable recommendations
4. Prioritize recommendations by impact
5. Address potential risks and mitigation strategies"""
)
Message Passing Between Agents
@dataclass
class Message:
"""Message structure for agent communication."""
sender: str
recipient: str
content: str
agent_role: Optional[str] = None
class AgentMessenger:
"""Manages communication between agents."""
def __init__(self):
self.message_history: List[Message] = []
self.client = anthropic.Anthropic()
def send_message(self, from_agent: str, to_agent: str, content: str) -> str:
"""Send message and get response."""
msg = Message(sender=from_agent, recipient=to_agent, content=content)
self.message_history.append(msg)
# Format context from message history
conversation_history = []
for m in self.message_history[-5:]: # Keep last 5 messages for context
if m.recipient == to_agent:
conversation_history.append({
"role": "user",
"content": f"From {m.sender}: {m.content}"
})
# Get response from recipient agent
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=recipient_agent.system_prompt,
messages=conversation_history + [
{"role": "user", "content": content}
]
)
return response.content[0].text
messenger = AgentMessenger()
LangGraph for Orchestration
LangGraph provides a framework for defining agent workflows as graphs:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
"""State for research workflow."""
user_query: str
research_findings: str
analysis_results: str
recommendations: str
conversation_history: Annotated[list, operator.add]
class ResearchGraph:
"""Define multi-agent workflow as a graph."""
def __init__(self):
self.client = anthropic.Anthropic()
self.graph = self._build_graph()
def _build_graph(self):
"""Build LangGraph workflow."""
workflow = StateGraph(ResearchState)
# Add nodes for each agent
workflow.add_node("research", self._research_node)
workflow.add_node("analyze", self._analyze_node)
workflow.add_node("recommend", self._recommend_node)
# Define edges
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "recommend")
workflow.add_edge("recommend", END)
# Set entry point
workflow.set_entry_point("research")
return workflow.compile()
def _research_node(self, state: ResearchState) -> ResearchState:
"""Research agent node."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1500,
system=research_agent.system_prompt,
messages=[
{"role": "user", "content": f"Research this topic: {state['user_query']}"}
]
)
findings = response.content[0].text
state["research_findings"] = findings
state["conversation_history"].append(("research", findings))
return state
def _analyze_node(self, state: ResearchState) -> ResearchState:
"""Analysis agent node."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1500,
system=analysis_agent.system_prompt,
messages=[
{"role": "user", "content": f"Analyze these findings: {state['research_findings']}"}
]
)
analysis = response.content[0].text
state["analysis_results"] = analysis
state["conversation_history"].append(("analysis", analysis))
return state
def _recommend_node(self, state: ResearchState) -> ResearchState:
"""Recommendation agent node."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1500,
system=recommendation_agent.system_prompt,
messages=[
{"role": "user", "content": f"Based on this analysis: {state['analysis_results']}, provide recommendations"}
]
)
recommendations = response.content[0].text
state["recommendations"] = recommendations
state["conversation_history"].append(("recommendation", recommendations))
return state
def run(self, user_query: str) -> dict:
"""Run the research workflow."""
initial_state: ResearchState = {
"user_query": user_query,
"research_findings": "",
"analysis_results": "",
"recommendations": "",
"conversation_history": []
}
final_state = self.graph.invoke(initial_state)
return final_state
# Usage
research_workflow = ResearchGraph()
result = research_workflow.run("What are emerging trends in AI safety?")
print(result["recommendations"])
CrewAI Framework
CrewAI simplifies multi-agent orchestration with an intuitive API:
from crewai import Agent, Task, Crew
# Define agents with specialized roles
planner = Agent(
role="Project Planner",
goal="Create detailed project plans with milestones and dependencies",
backstory="Expert project manager with 10+ years experience",
verbose=True
)
executor = Agent(
role="Task Executor",
goal="Execute planned tasks efficiently and report progress",
backstory="Experienced engineer focused on reliable task completion",
verbose=True
)
reviewer = Agent(
role="Quality Reviewer",
goal="Review work quality and ensure standards are met",
backstory="Quality assurance expert with attention to detail",
verbose=True
)
# Define tasks
planning_task = Task(
description="Plan a software deployment project with phases",
expected_output="Project plan with milestones, timeline, and dependencies",
agent=planner
)
execution_task = Task(
description="Execute the deployment plan steps in order",
expected_output="Execution report with completed steps and any issues",
agent=executor
)
review_task = Task(
description="Review the execution report and approve quality",
expected_output="Quality review with approval or improvement recommendations",
agent=reviewer
)
# Create crew and run
crew = Crew(agents=[planner, executor, reviewer], tasks=[planning_task, execution_task, review_task])
result = crew.kickoff()
print(result)
Agent Coordination Patterns
Sequential Coordination
Agents execute one after another, with output from each feeding into the next:
class SequentialCoordinator:
"""Coordinate agents sequentially."""
def __init__(self, client):
self.client = client
self.agents = []
def add_agent(self, name: str, system_prompt: str):
"""Register an agent."""
self.agents.append({"name": name, "system_prompt": system_prompt})
def run(self, initial_input: str) -> str:
"""Run all agents sequentially."""
current_input = initial_input
for agent in self.agents:
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
system=agent["system_prompt"],
messages=[{"role": "user", "content": current_input}]
)
current_input = response.content[0].text
print(f"{agent['name']} output: {current_input}\n")
return current_input
coordinator = SequentialCoordinator(anthropic.Anthropic())
coordinator.add_agent("Researcher", "You are a researcher. Research the given topic.")
coordinator.add_agent("Writer", "You are a writer. Write a summary based on the research.")
result = coordinator.run("Quantum computing applications")
Hierarchical Coordination
Agents organized in hierarchy with manager coordinating workers:
class HierarchicalCoordinator:
"""Hierarchy-based agent coordination."""
def __init__(self, client):
self.client = client
def run_hierarchical_task(self, task: str) -> str:
"""Manager coordinates worker agents."""
# Manager delegates to workers
manager_prompt = f"""You are a project manager. Decompose this task into subtasks:
Task: {task}
For each subtask, identify what type of agent should handle it.
Format: subtask_name | agent_type | description"""
decomposition = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=500,
messages=[{"role": "user", "content": manager_prompt}]
)
subtasks = decomposition.content[0].text
# Workers execute subtasks
worker_prompt = f"""You are a specialized worker. Execute this subtask:
{subtasks}
Provide detailed output for the manager to integrate."""
worker_output = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{"role": "user", "content": worker_prompt}]
)
# Manager integrates results
integration_prompt = f"""As manager, integrate these worker results:
Subtasks: {subtasks}
Results: {worker_output.content[0].text}
Provide final integrated output."""
final_result = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{"role": "user", "content": integration_prompt}]
)
return final_result.content[0].text
Error Handling in Multi-Agent Systems
Handle failures gracefully with retry logic and fallback agents:
class RobustMultiAgentSystem:
"""Multi-agent system with error handling."""
def __init__(self, client):
self.client = client
self.max_retries = 3
def run_with_fallback(self, primary_agent, fallback_agent, task: str) -> str:
"""Run primary agent with fallback."""
for attempt in range(self.max_retries):
try:
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
system=primary_agent["system"],
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
except Exception as e:
if attempt == self.max_retries - 1:
# Use fallback agent
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
system=fallback_agent["system"],
messages=[{"role": "user", "content": task}]
)
return response.content[0].text
continue
Best Practices for Multi-Agent Systems
1. Clear Agent Responsibilities
Each agent should have a single, well-defined purpose. Avoid overlap.
2. Standardized Communication
Define message formats that agents understand and can parse.
3. Monitoring and Logging
Track agent interactions for debugging and optimization:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def logged_agent_call(agent_name, task, response):
"""Log all agent calls."""
logger.info(f"Agent: {agent_name}, Task: {task}")
logger.info(f"Response: {response}")
4. Timeout and Resource Limits
Prevent runaway agents with execution limits:
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Agent execution timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(30) # 30 second timeout
Key Takeaway
Multi-agent systems leverage specialization and parallel processing to solve complex problems more effectively than single agents. Proper orchestration, communication patterns, and error handling are essential for building robust multi-agent architectures that scale gracefully.
Exercises
-
Implement Sequential Agents: Create a three-agent system (researcher, analyzer, recommender) that runs sequentially with information flowing through each stage.
-
Build a Crew: Use CrewAI to define a crew with distinct agent roles and tasks. Experiment with different numbers of agents.
-
Add Hierarchy: Implement a hierarchical coordinator where a manager agent delegates tasks to multiple worker agents.
-
Communication Protocol: Define a structured message format and implement message routing between agents.
-
Error Resilience: Add retry logic, timeouts, and fallback agents to handle failures gracefully.
-
Performance Analysis: Compare sequential vs. hierarchical vs. graph-based coordination. Measure execution time and solution quality.