Multi-Agent Example¶
Orchestrate multiple specialized agents for complex tasks.
Overview¶
Multi-agent systems allow you to:
- Delegate tasks to specialized agents
- Combine expertise from different domains
- Handle complex queries that require multiple capabilities
Agent-to-Agent Flow¶
The simplest multi-agent pattern:
from akordi_agents.core import create_langgraph_agent
from akordi_agents.services import AWSBedrockService
# Create specialized agents
weather_agent = create_langgraph_agent(
name="weather_expert",
llm_service=AWSBedrockService(),
tools=[WeatherTool()],
config={"enable_tools": True}
)
finance_agent = create_langgraph_agent(
name="finance_expert",
llm_service=AWSBedrockService(),
tools=[FinanceTool()],
config={"enable_tools": True}
)
risk_agent = create_langgraph_agent(
name="risk_expert",
llm_service=AWSBedrockService(),
tools=[RiskTool()],
config={"enable_tools": True}
)
# Coordinator that routes to appropriate agents
def process_query(query: str) -> dict:
query_lower = query.lower()
results = {}
if "weather" in query_lower:
results["weather"] = weather_agent.process_request({
"query": query,
"system_message": "You are a weather expert.",
})
if "financial" in query_lower or "cost" in query_lower:
results["finance"] = finance_agent.process_request({
"query": query,
"system_message": "You are a financial analyst.",
})
if "risk" in query_lower or "safety" in query_lower:
results["risk"] = risk_agent.process_request({
"query": query,
"system_message": "You are a risk assessment expert.",
})
return results
LangGraph Multi-Agent Workflow¶
Use MultiAgentWorkflow for automatic coordination:
from akordi_agents.core.langgraph import (
MultiAgentWorkflow,
WorkflowConfig,
)
# Create workflow with multiple agents
workflow = MultiAgentWorkflow(
name="multi_agent_system",
agents=[weather_agent, finance_agent, risk_agent],
config=WorkflowConfig(
enable_validation=True,
max_iterations=15,
),
)
# Execute workflow
result = workflow.execute({
"query": "Analyze weather impact on construction costs",
"user_id": "user-123",
})
Orchestrated Multi-Agent¶
Advanced orchestration with the A2A protocol:
from akordi_agents.core.langgraph import (
AgentRegistry,
AgentCapability,
create_coordinator_orchestrator,
OrchestratedMultiAgentWorkflow,
)
# Create agent registry
registry = AgentRegistry()
# Register agents with capabilities
registry.register_agent(
agent_id="weather_expert",
agent=weather_agent,
capabilities=[
AgentCapability(
name="weather_analysis",
description="Analyze weather conditions",
domains=["meteorology", "climate"]
)
]
)
registry.register_agent(
agent_id="finance_expert",
agent=finance_agent,
capabilities=[
AgentCapability(
name="financial_analysis",
description="Analyze financial data and costs",
domains=["finance", "economics"]
)
]
)
registry.register_agent(
agent_id="risk_expert",
agent=risk_agent,
capabilities=[
AgentCapability(
name="risk_assessment",
description="Assess project and operational risks",
domains=["risk", "safety", "compliance"]
)
]
)
# Create coordinator agent
coordinator_agent = create_langgraph_agent(
name="coordinator",
llm_service=AWSBedrockService(),
)
registry.register_agent(
agent_id="coordinator",
agent=coordinator_agent,
capabilities=[
AgentCapability(
name="coordination",
description="Coordinate and delegate tasks",
domains=["management", "coordination"]
)
]
)
# Create orchestrator
orchestrator = create_coordinator_orchestrator(
coordinator_id="coordinator",
registry=registry,
)
# Create workflow
workflow = OrchestratedMultiAgentWorkflow(orchestrator)
# Execute complex query
result = await workflow.execute(
"Assess weather risks for construction in London, "
"estimate cost impacts, and provide safety recommendations",
context={
"priority": "high",
"project_type": "commercial",
}
)
print("Final Response:", result["response"])
print("Agents Used:", result["agents_used"])
print("Task Breakdown:", result["tasks"])
A2A Protocol¶
Direct agent-to-agent communication:
from akordi_agents.core.langgraph import (
A2AMessage,
A2AMetadata,
A2AProtocolLayer,
MessageType,
MessagePriority,
)
# Create protocol layer
protocol = A2AProtocolLayer()
# Send message from weather to risk agent
message = A2AMessage(
message_type=MessageType.REQUEST,
sender_id="weather_expert",
receiver_id="risk_expert",
content={
"weather_data": {
"temperature": "35°C",
"conditions": "Extreme heat",
"wind": "Strong winds",
},
"request": "Assess safety risks for construction"
},
metadata=A2AMetadata(
priority=MessagePriority.HIGH,
requires_response=True,
)
)
# Send and await response
response = await protocol.send(message)
print("Risk Assessment:", response.content)
Hierarchical Orchestration¶
Multi-level agent organization:
from akordi_agents.core.langgraph import create_hierarchical_orchestrator
# Level 1: Executive coordinator
# Level 2: Domain managers
# Level 3: Specialist agents
orchestrator = create_hierarchical_orchestrator(registry)
# Query flows through hierarchy:
# 1. Executive analyzes query
# 2. Delegates to relevant domain manager
# 3. Domain manager delegates to specialists
# 4. Results aggregate back up
result = await orchestrator.orchestrate(
"Complete analysis of project viability in London"
)
Peer-to-Peer Orchestration¶
Agents communicate directly:
from akordi_agents.core.langgraph import create_peer_to_peer_orchestrator
orchestrator = create_peer_to_peer_orchestrator(registry)
# Agents collaborate directly without central coordinator
result = await orchestrator.orchestrate(
"Weather-dependent cost analysis",
initial_agent="weather_expert"
)
Complete Example¶
#!/usr/bin/env python
"""Multi-agent orchestration example."""
import os
import asyncio
from akordi_agents.core import create_langgraph_agent
from akordi_agents.services import AWSBedrockService
from akordi_agents.core.langgraph import (
AgentRegistry,
AgentCapability,
create_coordinator_orchestrator,
OrchestratedMultiAgentWorkflow,
)
# Define tools
class WeatherTool:
def get_name(self): return "weather"
def get_description(self): return "Get weather data"
def get_input_schema(self): return {"type": "object", "properties": {}}
def execute(self, **kwargs):
return {"temperature": "25°C", "conditions": "Clear"}
class RiskTool:
def get_name(self): return "risk"
def get_description(self): return "Assess risks"
def get_input_schema(self): return {"type": "object", "properties": {}}
def execute(self, **kwargs):
return {"risk_level": "Medium", "factors": ["weather", "location"]}
async def main():
os.environ["AWS_REGION"] = "us-east-1"
llm_service = AWSBedrockService()
# Create specialized agents
weather_agent = create_langgraph_agent(
name="weather_expert",
llm_service=llm_service,
tools=[WeatherTool()],
)
risk_agent = create_langgraph_agent(
name="risk_expert",
llm_service=llm_service,
tools=[RiskTool()],
)
coordinator = create_langgraph_agent(
name="coordinator",
llm_service=llm_service,
)
# Set up registry
registry = AgentRegistry()
registry.register_agent(
"weather_expert", weather_agent,
[AgentCapability("weather", "Weather analysis", ["weather"])]
)
registry.register_agent(
"risk_expert", risk_agent,
[AgentCapability("risk", "Risk assessment", ["safety", "risk"])]
)
registry.register_agent(
"coordinator", coordinator,
[AgentCapability("coordination", "Task coordination", ["management"])]
)
# Create orchestrator
orchestrator = create_coordinator_orchestrator("coordinator", registry)
# Create workflow
workflow = OrchestratedMultiAgentWorkflow(orchestrator)
# Test queries
queries = [
"What are the weather-related risks for outdoor work today?",
"Analyze safety conditions for roofing work in current weather",
]
for query in queries:
print(f"\nQuery: {query}")
print("=" * 60)
result = await workflow.execute(query)
print(f"Response: {result.get('response', 'N/A')}")
print(f"Agents used: {result.get('agents_used', [])}")
print("-" * 60)
if __name__ == "__main__":
asyncio.run(main())
Running the Example¶
# Agent-to-agent flow
poetry run python examples/agent_to_agent_flow.py \
--query "What are the financial implications of construction work in London?"
# LangGraph orchestration
poetry run python examples/agent_orchestration_langgraph.py \
--query "Analyze project risks and costs"
Next Steps¶
- Guardrails - Add safety controls
- LangGraph Concepts - Workflow details
- API Reference - Complete API