Skip to content

Multi-Agent Example

Orchestrate multiple specialized agents for complex tasks.

Overview

Multi-agent systems allow you to:

  • Delegate tasks to specialized agents
  • Combine expertise from different domains
  • Handle complex queries that require multiple capabilities

Agent-to-Agent Flow

The simplest multi-agent pattern:

from akordi_agents.core import create_langgraph_agent
from akordi_agents.services import AWSBedrockService

# Create specialized agents
weather_agent = create_langgraph_agent(
    name="weather_expert",
    llm_service=AWSBedrockService(),
    tools=[WeatherTool()],
    config={"enable_tools": True}
)

finance_agent = create_langgraph_agent(
    name="finance_expert",
    llm_service=AWSBedrockService(),
    tools=[FinanceTool()],
    config={"enable_tools": True}
)

risk_agent = create_langgraph_agent(
    name="risk_expert",
    llm_service=AWSBedrockService(),
    tools=[RiskTool()],
    config={"enable_tools": True}
)

# Coordinator that routes to appropriate agents
def process_query(query: str) -> dict:
    query_lower = query.lower()

    results = {}

    if "weather" in query_lower:
        results["weather"] = weather_agent.process_request({
            "query": query,
            "system_message": "You are a weather expert.",
        })

    if "financial" in query_lower or "cost" in query_lower:
        results["finance"] = finance_agent.process_request({
            "query": query,
            "system_message": "You are a financial analyst.",
        })

    if "risk" in query_lower or "safety" in query_lower:
        results["risk"] = risk_agent.process_request({
            "query": query,
            "system_message": "You are a risk assessment expert.",
        })

    return results

LangGraph Multi-Agent Workflow

Use MultiAgentWorkflow for automatic coordination:

from akordi_agents.core.langgraph import (
    MultiAgentWorkflow,
    WorkflowConfig,
)

# Create workflow with multiple agents
workflow = MultiAgentWorkflow(
    name="multi_agent_system",
    agents=[weather_agent, finance_agent, risk_agent],
    config=WorkflowConfig(
        enable_validation=True,
        max_iterations=15,
    ),
)

# Execute workflow
result = workflow.execute({
    "query": "Analyze weather impact on construction costs",
    "user_id": "user-123",
})

Orchestrated Multi-Agent

Advanced orchestration with the A2A protocol:

from akordi_agents.core.langgraph import (
    AgentRegistry,
    AgentCapability,
    create_coordinator_orchestrator,
    OrchestratedMultiAgentWorkflow,
)

# Create agent registry
registry = AgentRegistry()

# Register agents with capabilities
registry.register_agent(
    agent_id="weather_expert",
    agent=weather_agent,
    capabilities=[
        AgentCapability(
            name="weather_analysis",
            description="Analyze weather conditions",
            domains=["meteorology", "climate"]
        )
    ]
)

registry.register_agent(
    agent_id="finance_expert",
    agent=finance_agent,
    capabilities=[
        AgentCapability(
            name="financial_analysis",
            description="Analyze financial data and costs",
            domains=["finance", "economics"]
        )
    ]
)

registry.register_agent(
    agent_id="risk_expert",
    agent=risk_agent,
    capabilities=[
        AgentCapability(
            name="risk_assessment",
            description="Assess project and operational risks",
            domains=["risk", "safety", "compliance"]
        )
    ]
)

# Create coordinator agent
coordinator_agent = create_langgraph_agent(
    name="coordinator",
    llm_service=AWSBedrockService(),
)

registry.register_agent(
    agent_id="coordinator",
    agent=coordinator_agent,
    capabilities=[
        AgentCapability(
            name="coordination",
            description="Coordinate and delegate tasks",
            domains=["management", "coordination"]
        )
    ]
)

# Create orchestrator
orchestrator = create_coordinator_orchestrator(
    coordinator_id="coordinator",
    registry=registry,
)

# Create workflow
workflow = OrchestratedMultiAgentWorkflow(orchestrator)

# Execute complex query
result = await workflow.execute(
    "Assess weather risks for construction in London, "
    "estimate cost impacts, and provide safety recommendations",
    context={
        "priority": "high",
        "project_type": "commercial",
    }
)

print("Final Response:", result["response"])
print("Agents Used:", result["agents_used"])
print("Task Breakdown:", result["tasks"])

A2A Protocol

Direct agent-to-agent communication:

from akordi_agents.core.langgraph import (
    A2AMessage,
    A2AMetadata,
    A2AProtocolLayer,
    MessageType,
    MessagePriority,
)

# Create protocol layer
protocol = A2AProtocolLayer()

# Send message from weather to risk agent
message = A2AMessage(
    message_type=MessageType.REQUEST,
    sender_id="weather_expert",
    receiver_id="risk_expert",
    content={
        "weather_data": {
            "temperature": "35°C",
            "conditions": "Extreme heat",
            "wind": "Strong winds",
        },
        "request": "Assess safety risks for construction"
    },
    metadata=A2AMetadata(
        priority=MessagePriority.HIGH,
        requires_response=True,
    )
)

# Send and await response
response = await protocol.send(message)
print("Risk Assessment:", response.content)

Hierarchical Orchestration

Multi-level agent organization:

from akordi_agents.core.langgraph import create_hierarchical_orchestrator

# Level 1: Executive coordinator
# Level 2: Domain managers
# Level 3: Specialist agents

orchestrator = create_hierarchical_orchestrator(registry)

# Query flows through hierarchy:
# 1. Executive analyzes query
# 2. Delegates to relevant domain manager
# 3. Domain manager delegates to specialists
# 4. Results aggregate back up

result = await orchestrator.orchestrate(
    "Complete analysis of project viability in London"
)

Peer-to-Peer Orchestration

Agents communicate directly:

from akordi_agents.core.langgraph import create_peer_to_peer_orchestrator

orchestrator = create_peer_to_peer_orchestrator(registry)

# Agents collaborate directly without central coordinator
result = await orchestrator.orchestrate(
    "Weather-dependent cost analysis",
    initial_agent="weather_expert"
)

Complete Example

#!/usr/bin/env python
"""Multi-agent orchestration example."""

import os
import asyncio
from akordi_agents.core import create_langgraph_agent
from akordi_agents.services import AWSBedrockService
from akordi_agents.core.langgraph import (
    AgentRegistry,
    AgentCapability,
    create_coordinator_orchestrator,
    OrchestratedMultiAgentWorkflow,
)


# Define tools
class WeatherTool:
    def get_name(self): return "weather"
    def get_description(self): return "Get weather data"
    def get_input_schema(self): return {"type": "object", "properties": {}}
    def execute(self, **kwargs):
        return {"temperature": "25°C", "conditions": "Clear"}


class RiskTool:
    def get_name(self): return "risk"
    def get_description(self): return "Assess risks"
    def get_input_schema(self): return {"type": "object", "properties": {}}
    def execute(self, **kwargs):
        return {"risk_level": "Medium", "factors": ["weather", "location"]}


async def main():
    os.environ["AWS_REGION"] = "us-east-1"

    llm_service = AWSBedrockService()

    # Create specialized agents
    weather_agent = create_langgraph_agent(
        name="weather_expert",
        llm_service=llm_service,
        tools=[WeatherTool()],
    )

    risk_agent = create_langgraph_agent(
        name="risk_expert",
        llm_service=llm_service,
        tools=[RiskTool()],
    )

    coordinator = create_langgraph_agent(
        name="coordinator",
        llm_service=llm_service,
    )

    # Set up registry
    registry = AgentRegistry()

    registry.register_agent(
        "weather_expert", weather_agent,
        [AgentCapability("weather", "Weather analysis", ["weather"])]
    )

    registry.register_agent(
        "risk_expert", risk_agent,
        [AgentCapability("risk", "Risk assessment", ["safety", "risk"])]
    )

    registry.register_agent(
        "coordinator", coordinator,
        [AgentCapability("coordination", "Task coordination", ["management"])]
    )

    # Create orchestrator
    orchestrator = create_coordinator_orchestrator("coordinator", registry)

    # Create workflow
    workflow = OrchestratedMultiAgentWorkflow(orchestrator)

    # Test queries
    queries = [
        "What are the weather-related risks for outdoor work today?",
        "Analyze safety conditions for roofing work in current weather",
    ]

    for query in queries:
        print(f"\nQuery: {query}")
        print("=" * 60)

        result = await workflow.execute(query)

        print(f"Response: {result.get('response', 'N/A')}")
        print(f"Agents used: {result.get('agents_used', [])}")
        print("-" * 60)


if __name__ == "__main__":
    asyncio.run(main())

Running the Example

# Agent-to-agent flow
poetry run python examples/agent_to_agent_flow.py \
  --query "What are the financial implications of construction work in London?"

# LangGraph orchestration
poetry run python examples/agent_orchestration_langgraph.py \
  --query "Analyze project risks and costs"

Next Steps