Agent Orchestration Example¶
Demonstrate sophisticated multi-agent coordination using the SDK's built-in orchestration module.
Overview¶
The agent_orchestration_langgraph.py example showcases how to use the SDK's orchestration capabilities to coordinate multiple specialized agents.
Features Demonstrated¶
- Agent Registry - Centralized agent management
- Orchestration Patterns - Coordinator, Peer-to-Peer, Hierarchical
- Task Decomposition - Breaking complex tasks into subtasks
- Result Aggregation - Combining results from multiple agents
- Specialized Agents - Weather, Risk, Research experts
Architecture¶
flowchart TD
Q[User Query] --> O[Orchestrator]
O --> C{Pattern}
C -->|Coordinator| CO[Coordinator Agent]
CO --> W[Weather Agent]
CO --> R[Risk Agent]
CO --> RE[Research Agent]
C -->|Peer-to-Peer| PP[All Agents]
PP --> W2[Weather Agent]
PP --> R2[Risk Agent]
PP --> RE2[Research Agent]
C -->|Hierarchical| H[Hierarchy]
H --> L1[Executive Level]
L1 --> L2[Manager Level]
L2 --> L3[Specialist Level]
W --> AGG[Result Aggregator]
R --> AGG
RE --> AGG
W2 --> AGG
R2 --> AGG
RE2 --> AGG
L3 --> AGG
AGG --> RES[Final Response]
Setting Up the Agent Registry¶
The AgentRegistry manages all agents and their capabilities:
from akordi_agents.core import create_langgraph_agent
from akordi_agents.core.langgraph.orchestration import (
AgentCapability,
AgentRegistry,
AgentRole,
)
def setup_agent_registry(model_id: str) -> AgentRegistry:
"""Set up agent registry with specialized agents."""
registry = AgentRegistry()
# Create Weather Agent
weather_agent = create_langgraph_agent(
name="weather_agent",
llm_service=AgentLLMService(model_id, agent_role="weather"),
tools=[WeatherTool()],
config={
"enable_validation": True,
"enable_tools": True,
"max_iterations": 5,
},
)
# Define capabilities
weather_capability = AgentCapability(
name="weather_data",
domain="weather",
skills=["weather_lookup", "forecast", "climate_analysis"],
confidence_score=0.9,
max_concurrent_tasks=3,
estimated_latency=2.0,
)
# Register agent
registry.register_agent(
agent_id="weather_agent",
agent=weather_agent,
role=AgentRole.SPECIALIST,
capabilities=[weather_capability],
)
# Similar setup for Risk Agent
risk_agent = create_langgraph_agent(
name="risk_agent",
llm_service=AgentLLMService(model_id, agent_role="risk"),
tools=[RiskAnalysisTool()],
config={"enable_tools": True},
)
registry.register_agent(
agent_id="risk_agent",
agent=risk_agent,
role=AgentRole.SPECIALIST,
capabilities=[AgentCapability(
name="risk_assessment",
domain="risk",
skills=["risk_analysis", "safety_assessment"],
confidence_score=0.9,
)],
)
# Coordinator Agent (no tools)
coordinator_agent = create_langgraph_agent(
name="coordinator_agent",
llm_service=AgentLLMService(model_id, agent_role="coordinator"),
tools=[],
config={"enable_tools": False},
)
registry.register_agent(
agent_id="coordinator_agent",
agent=coordinator_agent,
role=AgentRole.COORDINATOR,
capabilities=[AgentCapability(
name="coordination",
domain="coordination",
skills=["task_planning", "delegation", "synthesis"],
)],
)
return registry
Orchestration Patterns¶
1. Coordinator Pattern¶
Central coordinator delegates tasks to specialized agents:
from akordi_agents.core.langgraph.orchestration import (
CoordinatorOrchestrator,
ResultAggregator,
)
async def coordinator_pattern(registry: AgentRegistry, query: str):
"""Coordinator-based orchestration."""
orchestrator = CoordinatorOrchestrator(
coordinator_agent_id="coordinator_agent",
agent_registry=registry,
result_aggregator=ResultAggregator(
aggregation_strategy="consolidate"
),
)
result = await orchestrator.orchestrate(query)
print(f"Success: {result.get('success')}")
print(f"Pattern: {result.get('orchestration_pattern')}")
print(f"Tasks Executed: {result.get('tasks_executed')}")
return result
Use Case: Complex queries requiring multiple expert opinions with centralized decision-making.
2. Peer-to-Peer Pattern¶
Agents communicate directly without central coordination:
from akordi_agents.core.langgraph.orchestration import PeerToPeerOrchestrator
async def peer_to_peer_pattern(registry: AgentRegistry, query: str):
"""Peer-to-peer orchestration."""
orchestrator = PeerToPeerOrchestrator(
agent_registry=registry,
result_aggregator=ResultAggregator(
aggregation_strategy="consolidate"
),
)
result = await orchestrator.orchestrate(query)
print(f"Agents Used: {result.get('agents_used')}")
print(f"Total Candidates: {result.get('total_candidates')}")
return result
Use Case: Independent analysis tasks where agents can work in parallel.
3. Hierarchical Pattern¶
Agents organized in levels with delegation:
from akordi_agents.core.langgraph.orchestration import HierarchicalOrchestrator
async def hierarchical_pattern(registry: AgentRegistry, query: str):
"""Hierarchical orchestration."""
# Define hierarchy levels
hierarchy_levels = {
"executive": ["coordinator_agent"],
"manager": ["weather_agent", "risk_agent"],
"specialist": ["research_agent"],
}
orchestrator = HierarchicalOrchestrator(
hierarchy_levels=hierarchy_levels,
agent_registry=registry,
result_aggregator=ResultAggregator(
aggregation_strategy="consolidate"
),
)
result = await orchestrator.orchestrate(query)
print(f"Hierarchy Levels Used: {result.get('hierarchy_levels_used')}")
return result
Use Case: Enterprise workflows with approval chains and escalation paths.
Task Decomposition¶
Break complex tasks into manageable subtasks:
from akordi_agents.core.langgraph.orchestration import Task, TaskDecomposer
def decompose_complex_task(registry: AgentRegistry):
"""Demonstrate task decomposition."""
decomposer = TaskDecomposer(registry)
# Create a complex task
complex_task = Task(
id="complex_1",
description=(
"Analyze the weather conditions in multiple cities, "
"assess the risks for outdoor construction work, "
"and generate a comprehensive report."
),
requirements={
"domain": "general",
"skills": ["analysis", "reporting"]
},
priority=1,
)
# Decompose into subtasks
subtasks = decomposer.decompose_task(complex_task)
for subtask in subtasks:
print(f"Subtask: {subtask.id}")
print(f" Description: {subtask.description}")
print(f" Domain: {subtask.requirements.get('domain')}")
print(f" Dependencies: {subtask.dependencies}")
return subtasks
Result Aggregation¶
Combine results from multiple agents:
from akordi_agents.core.langgraph.orchestration import ResultAggregator
def aggregate_results():
"""Demonstrate result aggregation strategies."""
# Sample results from agents
results = [
{
"answer": "Weather in London: 15°C, Cloudy",
"confidence": 0.9,
"source": "weather_agent",
},
{
"answer": "Risk assessment: MEDIUM risk",
"confidence": 0.85,
"source": "risk_agent",
},
{
"answer": "Best practices: Use safety gear",
"confidence": 0.8,
"source": "research_agent",
},
]
# Test different strategies
strategies = ["consolidate", "vote", "merge"]
for strategy in strategies:
aggregator = ResultAggregator(aggregation_strategy=strategy)
aggregated = aggregator.aggregate_results(results)
print(f"\n{strategy.upper()} Strategy:")
print(f" Result: {aggregated}")
Aggregation Strategies¶
| Strategy | Description |
|---|---|
consolidate |
Combine all results into a structured response |
vote |
Use majority voting for consensus |
merge |
Merge results based on confidence scores |
Specialized Tools¶
Weather Tool¶
from akordi_agents.tools import Tool
class WeatherTool(Tool):
"""Fetch real-time weather data."""
def get_name(self) -> str:
return "weather_tool"
def get_description(self) -> str:
return "Fetches current weather for any city."
def get_input_schema(self) -> dict:
return {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name (e.g., 'London, UK')",
}
},
"required": ["city"],
}
def execute(self, **kwargs):
import requests
city = kwargs.get("city", "London")
api_key = os.getenv("WEATHER_API_KEY")
response = requests.get(
"http://api.weatherapi.com/v1/current.json",
params={"key": api_key, "q": city},
timeout=10
)
data = response.json()
return {
"success": True,
"data": {
"city": data["location"]["name"],
"temperature_c": data["current"]["temp_c"],
"condition": data["current"]["condition"]["text"],
}
}
Risk Analysis Tool¶
class RiskAnalysisTool(Tool):
"""Analyze project risks."""
def get_name(self) -> str:
return "risk_analysis_tool"
def get_description(self) -> str:
return "Analyzes risks for construction activities."
def get_input_schema(self) -> dict:
return {
"type": "object",
"properties": {
"activity": {"type": "string"},
"conditions": {"type": "string"},
},
"required": ["activity"],
}
def execute(self, **kwargs):
activity = kwargs.get("activity", "general work")
conditions = kwargs.get("conditions", "normal")
# Risk assessment logic
risk_factors = {
"extreme heat": ["heat stress", "equipment failure"],
"rain": ["slippery surfaces", "delays"],
"wind": ["falling objects", "crane operations"],
}
identified_risks = []
for key, risks in risk_factors.items():
if key in conditions.lower():
identified_risks.extend(risks)
risk_level = "HIGH" if len(identified_risks) > 2 else "MEDIUM"
return {
"success": True,
"data": {
"activity": activity,
"risk_level": risk_level,
"identified_risks": identified_risks,
"recommendations": [
"Conduct safety briefing",
"Ensure proper PPE",
"Monitor conditions",
],
}
}
Running the Example¶
# Run all demos
poetry run python examples/agent_orchestration_langgraph.py
# Specific orchestration pattern
poetry run python examples/agent_orchestration_langgraph.py --pattern coordinator
poetry run python examples/agent_orchestration_langgraph.py --pattern peer_to_peer
poetry run python examples/agent_orchestration_langgraph.py --pattern hierarchical
# Specific demo mode
poetry run python examples/agent_orchestration_langgraph.py --demo patterns
poetry run python examples/agent_orchestration_langgraph.py --demo decomposition
poetry run python examples/agent_orchestration_langgraph.py --demo aggregation
# Custom query
poetry run python examples/agent_orchestration_langgraph.py \
--query "Analyze weather risks for construction in Sydney"
Command Line Arguments¶
| Argument | Options | Description |
|---|---|---|
--pattern |
coordinator, peer_to_peer, hierarchical, all |
Orchestration pattern |
--demo |
patterns, decomposition, aggregation, all |
Demo mode |
--query |
string | Custom query to process |
Environment Setup¶
# Required
export AWS_REGION=us-east-1
# Optional (for real weather data)
export WEATHER_API_KEY=your-api-key
# Optional (for tracing)
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=your-key
Best Practices¶
1. Agent Specialization¶
Give each agent a focused domain:
# Good - specialized agents
weather_agent = create_langgraph_agent(name="weather_expert", ...)
risk_agent = create_langgraph_agent(name="risk_expert", ...)
# Avoid - generalist agents that overlap
agent1 = create_langgraph_agent(name="assistant1", ...)
agent2 = create_langgraph_agent(name="assistant2", ...)
2. Capability Definitions¶
Define clear, specific capabilities:
# Good - specific capabilities
AgentCapability(
name="weather_data",
domain="weather",
skills=["weather_lookup", "forecast", "climate_analysis"],
confidence_score=0.9,
)
# Avoid - vague capabilities
AgentCapability(
name="general",
domain="general",
skills=["everything"],
)
3. Error Handling¶
Handle agent failures gracefully:
result = await orchestrator.orchestrate(query)
if not result.get("success"):
# Check which agents failed
failed_agents = result.get("failed_agents", [])
# Implement fallback logic
Next Steps¶
- Multi-Agent - More multi-agent patterns
- Tool Integration - Build custom tools
- LangGraph Concepts - Deep dive into workflows