Intermediate

Multi-Agent Systems and Orchestration

Lesson 3 of 4 Estimated Time 50 min

Multi-Agent Systems and Orchestration

Complex problems often benefit from specialized agents working together. Multi-agent systems decompose tasks across multiple specialized agents, each with distinct roles and capabilities. This lesson explores how to design, orchestrate, and coordinate multiple agents using frameworks like LangGraph and CrewAI.

Why Multi-Agent Architectures

Single agents face inherent limitations. Multiple agents provide:

  • Specialization: Each agent focuses on specific domain expertise
  • Parallelization: Multiple agents work simultaneously on different subtasks
  • Resilience: If one agent fails, others continue operating
  • Scalability: Add agents to handle increased complexity
  • Interpretability: Understand system behavior by observing agent interactions

For example, a customer service system might have agents for:

  • Intent classification
  • Knowledge base retrieval
  • Sentiment analysis
  • Response generation
  • Policy compliance checking

Multi-Agent Communication Patterns

Agent Roles and Communication

Each agent should have a clear role and know how to communicate with others:

from dataclasses import dataclass
from typing import Optional, List
import anthropic

@dataclass
class Agent:
    """Represents an agent in the system."""
    name: str
    role: str
    description: str
    system_prompt: str

    def __repr__(self):
        return f"{self.name} ({self.role}): {self.description}"

# Define specialized agents
research_agent = Agent(
    name="Research Agent",
    role="researcher",
    description="Gathers and synthesizes information from multiple sources",
    system_prompt="""You are a research specialist. Your task is to:
1. Ask clarifying questions to understand information needs
2. Gather relevant information from your knowledge base
3. Synthesize findings into clear summaries
4. Identify gaps in information and suggest next steps"""
)

analysis_agent = Agent(
    name="Analysis Agent",
    role="analyst",
    description="Analyzes research findings and identifies patterns",
    system_prompt="""You are a data analyst. Your task is to:
1. Receive research findings from the researcher
2. Identify key patterns and trends
3. Calculate relevant statistics
4. Highlight insights and anomalies
5. Prepare data for visualization"""
)

recommendation_agent = Agent(
    name="Recommendation Agent",
    role="recommender",
    description="Provides actionable recommendations based on analysis",
    system_prompt="""You are a strategic advisor. Your task is to:
1. Review analytical findings
2. Consider context and constraints
3. Generate actionable recommendations
4. Prioritize recommendations by impact
5. Address potential risks and mitigation strategies"""
)

Message Passing Between Agents

@dataclass
class Message:
    """Message structure for agent communication."""
    sender: str
    recipient: str
    content: str
    agent_role: Optional[str] = None

class AgentMessenger:
    """Manages communication between agents."""

    def __init__(self):
        self.message_history: List[Message] = []
        self.client = anthropic.Anthropic()

    def send_message(self, from_agent: str, to_agent: str, content: str) -> str:
        """Send message and get response."""
        msg = Message(sender=from_agent, recipient=to_agent, content=content)
        self.message_history.append(msg)

        # Format context from message history
        conversation_history = []
        for m in self.message_history[-5:]:  # Keep last 5 messages for context
            if m.recipient == to_agent:
                conversation_history.append({
                    "role": "user",
                    "content": f"From {m.sender}: {m.content}"
                })

        # Get response from recipient agent
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system=recipient_agent.system_prompt,
            messages=conversation_history + [
                {"role": "user", "content": content}
            ]
        )

        return response.content[0].text

messenger = AgentMessenger()

LangGraph for Orchestration

LangGraph provides a framework for defining agent workflows as graphs:

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class ResearchState(TypedDict):
    """State for research workflow."""
    user_query: str
    research_findings: str
    analysis_results: str
    recommendations: str
    conversation_history: Annotated[list, operator.add]

class ResearchGraph:
    """Define multi-agent workflow as a graph."""

    def __init__(self):
        self.client = anthropic.Anthropic()
        self.graph = self._build_graph()

    def _build_graph(self):
        """Build LangGraph workflow."""
        workflow = StateGraph(ResearchState)

        # Add nodes for each agent
        workflow.add_node("research", self._research_node)
        workflow.add_node("analyze", self._analyze_node)
        workflow.add_node("recommend", self._recommend_node)

        # Define edges
        workflow.add_edge("research", "analyze")
        workflow.add_edge("analyze", "recommend")
        workflow.add_edge("recommend", END)

        # Set entry point
        workflow.set_entry_point("research")

        return workflow.compile()

    def _research_node(self, state: ResearchState) -> ResearchState:
        """Research agent node."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1500,
            system=research_agent.system_prompt,
            messages=[
                {"role": "user", "content": f"Research this topic: {state['user_query']}"}
            ]
        )

        findings = response.content[0].text
        state["research_findings"] = findings
        state["conversation_history"].append(("research", findings))

        return state

    def _analyze_node(self, state: ResearchState) -> ResearchState:
        """Analysis agent node."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1500,
            system=analysis_agent.system_prompt,
            messages=[
                {"role": "user", "content": f"Analyze these findings: {state['research_findings']}"}
            ]
        )

        analysis = response.content[0].text
        state["analysis_results"] = analysis
        state["conversation_history"].append(("analysis", analysis))

        return state

    def _recommend_node(self, state: ResearchState) -> ResearchState:
        """Recommendation agent node."""
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1500,
            system=recommendation_agent.system_prompt,
            messages=[
                {"role": "user", "content": f"Based on this analysis: {state['analysis_results']}, provide recommendations"}
            ]
        )

        recommendations = response.content[0].text
        state["recommendations"] = recommendations
        state["conversation_history"].append(("recommendation", recommendations))

        return state

    def run(self, user_query: str) -> dict:
        """Run the research workflow."""
        initial_state: ResearchState = {
            "user_query": user_query,
            "research_findings": "",
            "analysis_results": "",
            "recommendations": "",
            "conversation_history": []
        }

        final_state = self.graph.invoke(initial_state)
        return final_state

# Usage
research_workflow = ResearchGraph()
result = research_workflow.run("What are emerging trends in AI safety?")
print(result["recommendations"])

CrewAI Framework

CrewAI simplifies multi-agent orchestration with an intuitive API:

from crewai import Agent, Task, Crew

# Define agents with specialized roles
planner = Agent(
    role="Project Planner",
    goal="Create detailed project plans with milestones and dependencies",
    backstory="Expert project manager with 10+ years experience",
    verbose=True
)

executor = Agent(
    role="Task Executor",
    goal="Execute planned tasks efficiently and report progress",
    backstory="Experienced engineer focused on reliable task completion",
    verbose=True
)

reviewer = Agent(
    role="Quality Reviewer",
    goal="Review work quality and ensure standards are met",
    backstory="Quality assurance expert with attention to detail",
    verbose=True
)

# Define tasks
planning_task = Task(
    description="Plan a software deployment project with phases",
    expected_output="Project plan with milestones, timeline, and dependencies",
    agent=planner
)

execution_task = Task(
    description="Execute the deployment plan steps in order",
    expected_output="Execution report with completed steps and any issues",
    agent=executor
)

review_task = Task(
    description="Review the execution report and approve quality",
    expected_output="Quality review with approval or improvement recommendations",
    agent=reviewer
)

# Create crew and run
crew = Crew(agents=[planner, executor, reviewer], tasks=[planning_task, execution_task, review_task])
result = crew.kickoff()
print(result)

Agent Coordination Patterns

Sequential Coordination

Agents execute one after another, with output from each feeding into the next:

class SequentialCoordinator:
    """Coordinate agents sequentially."""

    def __init__(self, client):
        self.client = client
        self.agents = []

    def add_agent(self, name: str, system_prompt: str):
        """Register an agent."""
        self.agents.append({"name": name, "system_prompt": system_prompt})

    def run(self, initial_input: str) -> str:
        """Run all agents sequentially."""
        current_input = initial_input

        for agent in self.agents:
            response = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1000,
                system=agent["system_prompt"],
                messages=[{"role": "user", "content": current_input}]
            )

            current_input = response.content[0].text
            print(f"{agent['name']} output: {current_input}\n")

        return current_input

coordinator = SequentialCoordinator(anthropic.Anthropic())
coordinator.add_agent("Researcher", "You are a researcher. Research the given topic.")
coordinator.add_agent("Writer", "You are a writer. Write a summary based on the research.")
result = coordinator.run("Quantum computing applications")

Hierarchical Coordination

Agents organized in hierarchy with manager coordinating workers:

class HierarchicalCoordinator:
    """Hierarchy-based agent coordination."""

    def __init__(self, client):
        self.client = client

    def run_hierarchical_task(self, task: str) -> str:
        """Manager coordinates worker agents."""

        # Manager delegates to workers
        manager_prompt = f"""You are a project manager. Decompose this task into subtasks:
        Task: {task}

        For each subtask, identify what type of agent should handle it.
        Format: subtask_name | agent_type | description"""

        decomposition = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=500,
            messages=[{"role": "user", "content": manager_prompt}]
        )

        subtasks = decomposition.content[0].text

        # Workers execute subtasks
        worker_prompt = f"""You are a specialized worker. Execute this subtask:
        {subtasks}

        Provide detailed output for the manager to integrate."""

        worker_output = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=[{"role": "user", "content": worker_prompt}]
        )

        # Manager integrates results
        integration_prompt = f"""As manager, integrate these worker results:
        Subtasks: {subtasks}
        Results: {worker_output.content[0].text}

        Provide final integrated output."""

        final_result = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=[{"role": "user", "content": integration_prompt}]
        )

        return final_result.content[0].text

Error Handling in Multi-Agent Systems

Handle failures gracefully with retry logic and fallback agents:

class RobustMultiAgentSystem:
    """Multi-agent system with error handling."""

    def __init__(self, client):
        self.client = client
        self.max_retries = 3

    def run_with_fallback(self, primary_agent, fallback_agent, task: str) -> str:
        """Run primary agent with fallback."""

        for attempt in range(self.max_retries):
            try:
                response = self.client.messages.create(
                    model="claude-3-5-sonnet-20241022",
                    max_tokens=1000,
                    system=primary_agent["system"],
                    messages=[{"role": "user", "content": task}]
                )

                return response.content[0].text

            except Exception as e:
                if attempt == self.max_retries - 1:
                    # Use fallback agent
                    response = self.client.messages.create(
                        model="claude-3-5-sonnet-20241022",
                        max_tokens=1000,
                        system=fallback_agent["system"],
                        messages=[{"role": "user", "content": task}]
                    )
                    return response.content[0].text
                continue

Best Practices for Multi-Agent Systems

1. Clear Agent Responsibilities

Each agent should have a single, well-defined purpose. Avoid overlap.

2. Standardized Communication

Define message formats that agents understand and can parse.

3. Monitoring and Logging

Track agent interactions for debugging and optimization:

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def logged_agent_call(agent_name, task, response):
    """Log all agent calls."""
    logger.info(f"Agent: {agent_name}, Task: {task}")
    logger.info(f"Response: {response}")

4. Timeout and Resource Limits

Prevent runaway agents with execution limits:

import signal

def timeout_handler(signum, frame):
    raise TimeoutError("Agent execution timeout")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(30)  # 30 second timeout

Key Takeaway

Multi-agent systems leverage specialization and parallel processing to solve complex problems more effectively than single agents. Proper orchestration, communication patterns, and error handling are essential for building robust multi-agent architectures that scale gracefully.

Exercises

  1. Implement Sequential Agents: Create a three-agent system (researcher, analyzer, recommender) that runs sequentially with information flowing through each stage.

  2. Build a Crew: Use CrewAI to define a crew with distinct agent roles and tasks. Experiment with different numbers of agents.

  3. Add Hierarchy: Implement a hierarchical coordinator where a manager agent delegates tasks to multiple worker agents.

  4. Communication Protocol: Define a structured message format and implement message routing between agents.

  5. Error Resilience: Add retry logic, timeouts, and fallback agents to handle failures gracefully.

  6. Performance Analysis: Compare sequential vs. hierarchical vs. graph-based coordination. Measure execution time and solution quality.