LangGraph Workflows¶
LangGraph integration provides sophisticated workflow orchestration for complex agent interactions.
Overview¶
LangGraph enables:
- State Management - Track workflow progress across nodes
- Conditional Routing - Dynamic paths based on conditions
- Multi-Agent Coordination - Orchestrate multiple agents
- Tool Orchestration - Intelligent tool selection and execution
- Streaming Support - Real-time response generation
Workflow Types¶
ToolUseWorkflow¶
Intelligent tool selection and execution workflow:
from akordi_agents.core.langgraph import ToolUseWorkflow, WorkflowConfig
config = WorkflowConfig(
enable_validation=True,
enable_tools=True,
max_iterations=10,
temperature=0.1,
)
workflow = ToolUseWorkflow(
name="tool_workflow",
tools=[weather_tool, calculator_tool],
config=config,
validator=my_validator,
llm_service=llm_service,
)
result = workflow.execute({
"query": "What's the weather in London?",
"user_id": "user-123",
})
Workflow Graph:
graph LR
A[Start] --> B[Validation]
B -->|Valid| C[Tool Decision]
B -->|Invalid| G[End]
C -->|Use Tool| D[Tool Execution]
C -->|No Tool| E[Response Generation]
D --> E
E --> F[End]
MultiAgentWorkflow¶
Coordinate multiple specialized agents:
from akordi_agents.core.langgraph import MultiAgentWorkflow
workflow = MultiAgentWorkflow(
name="multi_agent",
agents=[weather_agent, finance_agent, hr_agent],
config=config,
)
result = workflow.execute({
"query": "Analyze construction risks in London weather",
})
BaseAgentWorkflow¶
Extend for custom workflow implementations:
from akordi_agents.core.langgraph import BaseAgentWorkflow
class MyCustomWorkflow(BaseAgentWorkflow):
def _build_graph(self):
# Define custom nodes
self.graph.add_node("custom_node", self.custom_process)
# Define edges
self.graph.add_edge("custom_node", "response")
def custom_process(self, state):
# Custom processing logic
return {"processed": True}
Workflow Configuration¶
WorkflowConfig¶
from akordi_agents.core.langgraph import WorkflowConfig
config = WorkflowConfig(
enable_validation=True, # Run validation node
enable_tools=True, # Enable tool nodes
enable_tracing=True, # LangSmith tracing
max_iterations=10, # Max graph iterations
temperature=0.1, # LLM temperature
)
Nodes¶
Built-in Nodes¶
| Node | Description |
|---|---|
ValidationNode |
Validates input using custom validators |
SearchNode |
Searches knowledge base for context |
ToolDecisionNode |
Decides which tools to use |
ToolExecutionNode |
Executes selected tools |
ResponseGenerationNode |
Generates LLM response |
StreamingResponseNode |
Streams response in real-time |
CoordinatorNode |
Coordinates multi-agent tasks |
AggregationNode |
Aggregates results from multiple agents |
Creating Custom Nodes¶
from akordi_agents.core.langgraph import BaseNode
from akordi_agents.core.langgraph.state import AgentState, NodeResult
class MyCustomNode(BaseNode):
def __init__(self, name: str = "custom"):
super().__init__(name)
def process(self, state: AgentState) -> NodeResult:
# Access state
query = state.get("query", "")
# Process
result = self.do_something(query)
# Return result
return NodeResult(
success=True,
data={"result": result},
next_node="response" # Route to next node
)
def do_something(self, query: str) -> str:
return f"Processed: {query}"
State Management¶
AgentState¶
The workflow state contains:
from akordi_agents.core.langgraph.state import AgentState
state = AgentState(
query="User question",
user_id="user-123",
chat_id="chat-456",
chat_history=[],
validation_result=None,
search_results=[],
tool_decisions=[],
tool_results=[],
final_response="",
metadata={},
status="pending",
)
Accessing State in Nodes¶
def my_node(state: dict) -> dict:
# Read state
query = state.get("query")
history = state.get("chat_history", [])
# Update state (return only changes)
return {
"processed_query": query.lower(),
"status": "processing"
}
Conditional Routing¶
RoutingCondition¶
Define conditions for routing:
from akordi_agents.core.langgraph import RoutingCondition, RoutingRule
# Simple condition
has_tools = RoutingCondition(
name="has_tools",
check=lambda state: bool(state.get("tool_decisions"))
)
# Routing rule
tool_route = RoutingRule(
condition=has_tools,
target="tool_execution",
priority=1
)
ConditionalRouter¶
Create complex routing logic:
from akordi_agents.core.langgraph import ConditionalRouter, create_adaptive_router
router = ConditionalRouter(
rules=[
RoutingRule(
condition=RoutingCondition("needs_validation",
lambda s: not s.get("validated")),
target="validation",
),
RoutingRule(
condition=RoutingCondition("needs_tools",
lambda s: s.get("requires_tools")),
target="tool_decision",
),
],
default_target="response"
)
Multi-Agent Orchestration¶
Orchestration Patterns¶
Coordinator Pattern¶
Central coordinator delegates to specialized agents:
from akordi_agents.core.langgraph import (
AgentRegistry,
create_coordinator_orchestrator,
OrchestratedMultiAgentWorkflow
)
# Register agents
registry = AgentRegistry()
registry.register_agent("weather", weather_agent)
registry.register_agent("finance", finance_agent)
registry.register_agent("coordinator", coordinator_agent)
# Create orchestrator
orchestrator = create_coordinator_orchestrator(
coordinator_id="coordinator",
registry=registry
)
# Create workflow
workflow = OrchestratedMultiAgentWorkflow(orchestrator)
result = await workflow.execute(
"Analyze weather impact on construction costs",
context={"priority": "high"}
)
Peer-to-Peer Pattern¶
Agents communicate directly:
from akordi_agents.core.langgraph import create_peer_to_peer_orchestrator
orchestrator = create_peer_to_peer_orchestrator(registry)
Hierarchical Pattern¶
Multi-level agent organization:
from akordi_agents.core.langgraph import create_hierarchical_orchestrator
orchestrator = create_hierarchical_orchestrator(registry)
Agent-to-Agent Protocol (A2A)¶
Standardized communication between agents:
from akordi_agents.core.langgraph import (
A2AMessage,
A2AMetadata,
A2AProtocolLayer,
MessageType,
MessagePriority
)
# Create A2A message
message = A2AMessage(
message_type=MessageType.REQUEST,
sender_id="weather_agent",
receiver_id="risk_agent",
content="Weather data for London",
metadata=A2AMetadata(
priority=MessagePriority.HIGH,
requires_response=True
)
)
# Send via protocol layer
protocol = A2AProtocolLayer()
response = await protocol.send(message)
Workflow Execution¶
Synchronous Execution¶
Asynchronous Execution¶
Streaming Execution¶
async for chunk in workflow.stream({
"query": "Tell me about AI",
}):
print(chunk, end="", flush=True)
Tracing & Observability¶
LangSmith Integration¶
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "akordi-agents"
# Tracing is automatically enabled
workflow = ToolUseWorkflow(
name="traced_workflow",
config=WorkflowConfig(enable_tracing=True),
...
)
Best Practices¶
1. Keep Nodes Focused¶
Each node should do one thing well:
# Good: Single responsibility
class ValidationNode(BaseNode):
def process(self, state):
return self.validate_input(state)
# Avoid: Multiple responsibilities
class DoEverythingNode(BaseNode):
def process(self, state):
self.validate(state)
self.search(state)
self.generate(state)
2. Use Appropriate Max Iterations¶
# Simple queries
config = WorkflowConfig(max_iterations=5)
# Complex multi-tool workflows
config = WorkflowConfig(max_iterations=15)
3. Handle Errors in Nodes¶
def my_node(state: dict) -> dict:
try:
result = process(state)
return {"result": result, "status": "success"}
except Exception as e:
return {
"error": str(e),
"status": "error",
"next_node": "error_handler"
}
4. Enable Tracing in Development¶
Next Steps¶
- Tools - Create tools for workflows
- Multi-Agent Examples - Working examples
- API Reference - Complete API docs