Skip to content

Agent Orchestration Example

Demonstrate sophisticated multi-agent coordination using the SDK's built-in orchestration module.

Overview

The agent_orchestration_langgraph.py example showcases how to use the SDK's orchestration capabilities to coordinate multiple specialized agents.

Features Demonstrated

  • Agent Registry - Centralized agent management
  • Orchestration Patterns - Coordinator, Peer-to-Peer, Hierarchical
  • Task Decomposition - Breaking complex tasks into subtasks
  • Result Aggregation - Combining results from multiple agents
  • Specialized Agents - Weather, Risk, Research experts

Architecture

flowchart TD
    Q[User Query] --> O[Orchestrator]
    O --> C{Pattern}

    C -->|Coordinator| CO[Coordinator Agent]
    CO --> W[Weather Agent]
    CO --> R[Risk Agent]
    CO --> RE[Research Agent]

    C -->|Peer-to-Peer| PP[All Agents]
    PP --> W2[Weather Agent]
    PP --> R2[Risk Agent]
    PP --> RE2[Research Agent]

    C -->|Hierarchical| H[Hierarchy]
    H --> L1[Executive Level]
    L1 --> L2[Manager Level]
    L2 --> L3[Specialist Level]

    W --> AGG[Result Aggregator]
    R --> AGG
    RE --> AGG
    W2 --> AGG
    R2 --> AGG
    RE2 --> AGG
    L3 --> AGG

    AGG --> RES[Final Response]

Setting Up the Agent Registry

The AgentRegistry manages all agents and their capabilities:

from akordi_agents.core import create_langgraph_agent
from akordi_agents.core.langgraph.orchestration import (
    AgentCapability,
    AgentRegistry,
    AgentRole,
)

def setup_agent_registry(model_id: str) -> AgentRegistry:
    """Set up agent registry with specialized agents."""
    registry = AgentRegistry()

    # Create Weather Agent
    weather_agent = create_langgraph_agent(
        name="weather_agent",
        llm_service=AgentLLMService(model_id, agent_role="weather"),
        tools=[WeatherTool()],
        config={
            "enable_validation": True,
            "enable_tools": True,
            "max_iterations": 5,
        },
    )

    # Define capabilities
    weather_capability = AgentCapability(
        name="weather_data",
        domain="weather",
        skills=["weather_lookup", "forecast", "climate_analysis"],
        confidence_score=0.9,
        max_concurrent_tasks=3,
        estimated_latency=2.0,
    )

    # Register agent
    registry.register_agent(
        agent_id="weather_agent",
        agent=weather_agent,
        role=AgentRole.SPECIALIST,
        capabilities=[weather_capability],
    )

    # Similar setup for Risk Agent
    risk_agent = create_langgraph_agent(
        name="risk_agent",
        llm_service=AgentLLMService(model_id, agent_role="risk"),
        tools=[RiskAnalysisTool()],
        config={"enable_tools": True},
    )

    registry.register_agent(
        agent_id="risk_agent",
        agent=risk_agent,
        role=AgentRole.SPECIALIST,
        capabilities=[AgentCapability(
            name="risk_assessment",
            domain="risk",
            skills=["risk_analysis", "safety_assessment"],
            confidence_score=0.9,
        )],
    )

    # Coordinator Agent (no tools)
    coordinator_agent = create_langgraph_agent(
        name="coordinator_agent",
        llm_service=AgentLLMService(model_id, agent_role="coordinator"),
        tools=[],
        config={"enable_tools": False},
    )

    registry.register_agent(
        agent_id="coordinator_agent",
        agent=coordinator_agent,
        role=AgentRole.COORDINATOR,
        capabilities=[AgentCapability(
            name="coordination",
            domain="coordination",
            skills=["task_planning", "delegation", "synthesis"],
        )],
    )

    return registry

Orchestration Patterns

1. Coordinator Pattern

Central coordinator delegates tasks to specialized agents:

from akordi_agents.core.langgraph.orchestration import (
    CoordinatorOrchestrator,
    ResultAggregator,
)

async def coordinator_pattern(registry: AgentRegistry, query: str):
    """Coordinator-based orchestration."""

    orchestrator = CoordinatorOrchestrator(
        coordinator_agent_id="coordinator_agent",
        agent_registry=registry,
        result_aggregator=ResultAggregator(
            aggregation_strategy="consolidate"
        ),
    )

    result = await orchestrator.orchestrate(query)

    print(f"Success: {result.get('success')}")
    print(f"Pattern: {result.get('orchestration_pattern')}")
    print(f"Tasks Executed: {result.get('tasks_executed')}")

    return result

Use Case: Complex queries requiring multiple expert opinions with centralized decision-making.

2. Peer-to-Peer Pattern

Agents communicate directly without central coordination:

from akordi_agents.core.langgraph.orchestration import PeerToPeerOrchestrator

async def peer_to_peer_pattern(registry: AgentRegistry, query: str):
    """Peer-to-peer orchestration."""

    orchestrator = PeerToPeerOrchestrator(
        agent_registry=registry,
        result_aggregator=ResultAggregator(
            aggregation_strategy="consolidate"
        ),
    )

    result = await orchestrator.orchestrate(query)

    print(f"Agents Used: {result.get('agents_used')}")
    print(f"Total Candidates: {result.get('total_candidates')}")

    return result

Use Case: Independent analysis tasks where agents can work in parallel.

3. Hierarchical Pattern

Agents organized in levels with delegation:

from akordi_agents.core.langgraph.orchestration import HierarchicalOrchestrator

async def hierarchical_pattern(registry: AgentRegistry, query: str):
    """Hierarchical orchestration."""

    # Define hierarchy levels
    hierarchy_levels = {
        "executive": ["coordinator_agent"],
        "manager": ["weather_agent", "risk_agent"],
        "specialist": ["research_agent"],
    }

    orchestrator = HierarchicalOrchestrator(
        hierarchy_levels=hierarchy_levels,
        agent_registry=registry,
        result_aggregator=ResultAggregator(
            aggregation_strategy="consolidate"
        ),
    )

    result = await orchestrator.orchestrate(query)

    print(f"Hierarchy Levels Used: {result.get('hierarchy_levels_used')}")

    return result

Use Case: Enterprise workflows with approval chains and escalation paths.

Task Decomposition

Break complex tasks into manageable subtasks:

from akordi_agents.core.langgraph.orchestration import Task, TaskDecomposer

def decompose_complex_task(registry: AgentRegistry):
    """Demonstrate task decomposition."""

    decomposer = TaskDecomposer(registry)

    # Create a complex task
    complex_task = Task(
        id="complex_1",
        description=(
            "Analyze the weather conditions in multiple cities, "
            "assess the risks for outdoor construction work, "
            "and generate a comprehensive report."
        ),
        requirements={
            "domain": "general",
            "skills": ["analysis", "reporting"]
        },
        priority=1,
    )

    # Decompose into subtasks
    subtasks = decomposer.decompose_task(complex_task)

    for subtask in subtasks:
        print(f"Subtask: {subtask.id}")
        print(f"  Description: {subtask.description}")
        print(f"  Domain: {subtask.requirements.get('domain')}")
        print(f"  Dependencies: {subtask.dependencies}")

    return subtasks

Result Aggregation

Combine results from multiple agents:

from akordi_agents.core.langgraph.orchestration import ResultAggregator

def aggregate_results():
    """Demonstrate result aggregation strategies."""

    # Sample results from agents
    results = [
        {
            "answer": "Weather in London: 15°C, Cloudy",
            "confidence": 0.9,
            "source": "weather_agent",
        },
        {
            "answer": "Risk assessment: MEDIUM risk",
            "confidence": 0.85,
            "source": "risk_agent",
        },
        {
            "answer": "Best practices: Use safety gear",
            "confidence": 0.8,
            "source": "research_agent",
        },
    ]

    # Test different strategies
    strategies = ["consolidate", "vote", "merge"]

    for strategy in strategies:
        aggregator = ResultAggregator(aggregation_strategy=strategy)
        aggregated = aggregator.aggregate_results(results)
        print(f"\n{strategy.upper()} Strategy:")
        print(f"  Result: {aggregated}")

Aggregation Strategies

Strategy Description
consolidate Combine all results into a structured response
vote Use majority voting for consensus
merge Merge results based on confidence scores

Specialized Tools

Weather Tool

from akordi_agents.tools import Tool

class WeatherTool(Tool):
    """Fetch real-time weather data."""

    def get_name(self) -> str:
        return "weather_tool"

    def get_description(self) -> str:
        return "Fetches current weather for any city."

    def get_input_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "City name (e.g., 'London, UK')",
                }
            },
            "required": ["city"],
        }

    def execute(self, **kwargs):
        import requests
        city = kwargs.get("city", "London")
        api_key = os.getenv("WEATHER_API_KEY")

        response = requests.get(
            "http://api.weatherapi.com/v1/current.json",
            params={"key": api_key, "q": city},
            timeout=10
        )
        data = response.json()

        return {
            "success": True,
            "data": {
                "city": data["location"]["name"],
                "temperature_c": data["current"]["temp_c"],
                "condition": data["current"]["condition"]["text"],
            }
        }

Risk Analysis Tool

class RiskAnalysisTool(Tool):
    """Analyze project risks."""

    def get_name(self) -> str:
        return "risk_analysis_tool"

    def get_description(self) -> str:
        return "Analyzes risks for construction activities."

    def get_input_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "activity": {"type": "string"},
                "conditions": {"type": "string"},
            },
            "required": ["activity"],
        }

    def execute(self, **kwargs):
        activity = kwargs.get("activity", "general work")
        conditions = kwargs.get("conditions", "normal")

        # Risk assessment logic
        risk_factors = {
            "extreme heat": ["heat stress", "equipment failure"],
            "rain": ["slippery surfaces", "delays"],
            "wind": ["falling objects", "crane operations"],
        }

        identified_risks = []
        for key, risks in risk_factors.items():
            if key in conditions.lower():
                identified_risks.extend(risks)

        risk_level = "HIGH" if len(identified_risks) > 2 else "MEDIUM"

        return {
            "success": True,
            "data": {
                "activity": activity,
                "risk_level": risk_level,
                "identified_risks": identified_risks,
                "recommendations": [
                    "Conduct safety briefing",
                    "Ensure proper PPE",
                    "Monitor conditions",
                ],
            }
        }

Running the Example

# Run all demos
poetry run python examples/agent_orchestration_langgraph.py

# Specific orchestration pattern
poetry run python examples/agent_orchestration_langgraph.py --pattern coordinator
poetry run python examples/agent_orchestration_langgraph.py --pattern peer_to_peer
poetry run python examples/agent_orchestration_langgraph.py --pattern hierarchical

# Specific demo mode
poetry run python examples/agent_orchestration_langgraph.py --demo patterns
poetry run python examples/agent_orchestration_langgraph.py --demo decomposition
poetry run python examples/agent_orchestration_langgraph.py --demo aggregation

# Custom query
poetry run python examples/agent_orchestration_langgraph.py \
  --query "Analyze weather risks for construction in Sydney"

Command Line Arguments

Argument Options Description
--pattern coordinator, peer_to_peer, hierarchical, all Orchestration pattern
--demo patterns, decomposition, aggregation, all Demo mode
--query string Custom query to process

Environment Setup

# Required
export AWS_REGION=us-east-1

# Optional (for real weather data)
export WEATHER_API_KEY=your-api-key

# Optional (for tracing)
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=your-key

Best Practices

1. Agent Specialization

Give each agent a focused domain:

# Good - specialized agents
weather_agent = create_langgraph_agent(name="weather_expert", ...)
risk_agent = create_langgraph_agent(name="risk_expert", ...)

# Avoid - generalist agents that overlap
agent1 = create_langgraph_agent(name="assistant1", ...)
agent2 = create_langgraph_agent(name="assistant2", ...)

2. Capability Definitions

Define clear, specific capabilities:

# Good - specific capabilities
AgentCapability(
    name="weather_data",
    domain="weather",
    skills=["weather_lookup", "forecast", "climate_analysis"],
    confidence_score=0.9,
)

# Avoid - vague capabilities
AgentCapability(
    name="general",
    domain="general",
    skills=["everything"],
)

3. Error Handling

Handle agent failures gracefully:

result = await orchestrator.orchestrate(query)

if not result.get("success"):
    # Check which agents failed
    failed_agents = result.get("failed_agents", [])
    # Implement fallback logic

Next Steps