Skip to content

LangGraph Workflows

LangGraph integration provides sophisticated workflow orchestration for complex agent interactions.

Overview

LangGraph enables:

  • State Management - Track workflow progress across nodes
  • Conditional Routing - Dynamic paths based on conditions
  • Multi-Agent Coordination - Orchestrate multiple agents
  • Tool Orchestration - Intelligent tool selection and execution
  • Streaming Support - Real-time response generation

Workflow Types

ToolUseWorkflow

Intelligent tool selection and execution workflow:

from akordi_agents.core.langgraph import ToolUseWorkflow, WorkflowConfig

config = WorkflowConfig(
    enable_validation=True,
    enable_tools=True,
    max_iterations=10,
    temperature=0.1,
)

workflow = ToolUseWorkflow(
    name="tool_workflow",
    tools=[weather_tool, calculator_tool],
    config=config,
    validator=my_validator,
    llm_service=llm_service,
)

result = workflow.execute({
    "query": "What's the weather in London?",
    "user_id": "user-123",
})

Workflow Graph:

graph LR
    A[Start] --> B[Validation]
    B -->|Valid| C[Tool Decision]
    B -->|Invalid| G[End]
    C -->|Use Tool| D[Tool Execution]
    C -->|No Tool| E[Response Generation]
    D --> E
    E --> F[End]

MultiAgentWorkflow

Coordinate multiple specialized agents:

from akordi_agents.core.langgraph import MultiAgentWorkflow

workflow = MultiAgentWorkflow(
    name="multi_agent",
    agents=[weather_agent, finance_agent, hr_agent],
    config=config,
)

result = workflow.execute({
    "query": "Analyze construction risks in London weather",
})

BaseAgentWorkflow

Extend for custom workflow implementations:

from akordi_agents.core.langgraph import BaseAgentWorkflow

class MyCustomWorkflow(BaseAgentWorkflow):
    def _build_graph(self):
        # Define custom nodes
        self.graph.add_node("custom_node", self.custom_process)

        # Define edges
        self.graph.add_edge("custom_node", "response")

    def custom_process(self, state):
        # Custom processing logic
        return {"processed": True}

Workflow Configuration

WorkflowConfig

from akordi_agents.core.langgraph import WorkflowConfig

config = WorkflowConfig(
    enable_validation=True,     # Run validation node
    enable_tools=True,          # Enable tool nodes
    enable_tracing=True,        # LangSmith tracing
    max_iterations=10,          # Max graph iterations
    temperature=0.1,            # LLM temperature
)

Nodes

Built-in Nodes

Node Description
ValidationNode Validates input using custom validators
SearchNode Searches knowledge base for context
ToolDecisionNode Decides which tools to use
ToolExecutionNode Executes selected tools
ResponseGenerationNode Generates LLM response
StreamingResponseNode Streams response in real-time
CoordinatorNode Coordinates multi-agent tasks
AggregationNode Aggregates results from multiple agents

Creating Custom Nodes

from akordi_agents.core.langgraph import BaseNode
from akordi_agents.core.langgraph.state import AgentState, NodeResult

class MyCustomNode(BaseNode):
    def __init__(self, name: str = "custom"):
        super().__init__(name)

    def process(self, state: AgentState) -> NodeResult:
        # Access state
        query = state.get("query", "")

        # Process
        result = self.do_something(query)

        # Return result
        return NodeResult(
            success=True,
            data={"result": result},
            next_node="response"  # Route to next node
        )

    def do_something(self, query: str) -> str:
        return f"Processed: {query}"

State Management

AgentState

The workflow state contains:

from akordi_agents.core.langgraph.state import AgentState

state = AgentState(
    query="User question",
    user_id="user-123",
    chat_id="chat-456",
    chat_history=[],
    validation_result=None,
    search_results=[],
    tool_decisions=[],
    tool_results=[],
    final_response="",
    metadata={},
    status="pending",
)

Accessing State in Nodes

def my_node(state: dict) -> dict:
    # Read state
    query = state.get("query")
    history = state.get("chat_history", [])

    # Update state (return only changes)
    return {
        "processed_query": query.lower(),
        "status": "processing"
    }

Conditional Routing

RoutingCondition

Define conditions for routing:

from akordi_agents.core.langgraph import RoutingCondition, RoutingRule

# Simple condition
has_tools = RoutingCondition(
    name="has_tools",
    check=lambda state: bool(state.get("tool_decisions"))
)

# Routing rule
tool_route = RoutingRule(
    condition=has_tools,
    target="tool_execution",
    priority=1
)

ConditionalRouter

Create complex routing logic:

from akordi_agents.core.langgraph import ConditionalRouter, create_adaptive_router

router = ConditionalRouter(
    rules=[
        RoutingRule(
            condition=RoutingCondition("needs_validation",
                lambda s: not s.get("validated")),
            target="validation",
        ),
        RoutingRule(
            condition=RoutingCondition("needs_tools",
                lambda s: s.get("requires_tools")),
            target="tool_decision",
        ),
    ],
    default_target="response"
)

Multi-Agent Orchestration

Orchestration Patterns

Coordinator Pattern

Central coordinator delegates to specialized agents:

from akordi_agents.core.langgraph import (
    AgentRegistry,
    create_coordinator_orchestrator,
    OrchestratedMultiAgentWorkflow
)

# Register agents
registry = AgentRegistry()
registry.register_agent("weather", weather_agent)
registry.register_agent("finance", finance_agent)
registry.register_agent("coordinator", coordinator_agent)

# Create orchestrator
orchestrator = create_coordinator_orchestrator(
    coordinator_id="coordinator",
    registry=registry
)

# Create workflow
workflow = OrchestratedMultiAgentWorkflow(orchestrator)

result = await workflow.execute(
    "Analyze weather impact on construction costs",
    context={"priority": "high"}
)

Peer-to-Peer Pattern

Agents communicate directly:

from akordi_agents.core.langgraph import create_peer_to_peer_orchestrator

orchestrator = create_peer_to_peer_orchestrator(registry)

Hierarchical Pattern

Multi-level agent organization:

from akordi_agents.core.langgraph import create_hierarchical_orchestrator

orchestrator = create_hierarchical_orchestrator(registry)

Agent-to-Agent Protocol (A2A)

Standardized communication between agents:

from akordi_agents.core.langgraph import (
    A2AMessage,
    A2AMetadata,
    A2AProtocolLayer,
    MessageType,
    MessagePriority
)

# Create A2A message
message = A2AMessage(
    message_type=MessageType.REQUEST,
    sender_id="weather_agent",
    receiver_id="risk_agent",
    content="Weather data for London",
    metadata=A2AMetadata(
        priority=MessagePriority.HIGH,
        requires_response=True
    )
)

# Send via protocol layer
protocol = A2AProtocolLayer()
response = await protocol.send(message)

Workflow Execution

Synchronous Execution

result = workflow.execute({
    "query": "What's the weather?",
    "user_id": "user-123"
})

Asynchronous Execution

result = await workflow.execute_async({
    "query": "What's the weather?",
    "user_id": "user-123"
})

Streaming Execution

async for chunk in workflow.stream({
    "query": "Tell me about AI",
}):
    print(chunk, end="", flush=True)

Tracing & Observability

LangSmith Integration

import os

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "akordi-agents"

# Tracing is automatically enabled
workflow = ToolUseWorkflow(
    name="traced_workflow",
    config=WorkflowConfig(enable_tracing=True),
    ...
)

Best Practices

1. Keep Nodes Focused

Each node should do one thing well:

# Good: Single responsibility
class ValidationNode(BaseNode):
    def process(self, state):
        return self.validate_input(state)

# Avoid: Multiple responsibilities
class DoEverythingNode(BaseNode):
    def process(self, state):
        self.validate(state)
        self.search(state)
        self.generate(state)

2. Use Appropriate Max Iterations

# Simple queries
config = WorkflowConfig(max_iterations=5)

# Complex multi-tool workflows
config = WorkflowConfig(max_iterations=15)

3. Handle Errors in Nodes

def my_node(state: dict) -> dict:
    try:
        result = process(state)
        return {"result": result, "status": "success"}
    except Exception as e:
        return {
            "error": str(e),
            "status": "error",
            "next_node": "error_handler"
        }

4. Enable Tracing in Development

if os.environ.get("ENVIRONMENT") == "development":
    config = WorkflowConfig(enable_tracing=True)

Next Steps