Services API¶
Complete API reference for SDK services.
LLM Services¶
AWSBedrockService¶
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
config |
AWSBedrockConfig |
None |
Service configuration |
region_name |
str |
None |
AWS region |
Methods¶
generate_response¶
def generate_response(
self,
prompt: str,
context: Optional[str] = None,
system_message: Optional[str] = None,
chat_history: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: float = 0.1,
**kwargs,
) -> Dict[str, Any]
Generate a response using AWS Bedrock.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
prompt |
str |
- | User prompt |
context |
str |
None |
Additional context |
system_message |
str |
None |
System prompt |
chat_history |
List[Dict] |
None |
Previous messages |
max_tokens |
int |
4096 |
Max response tokens |
temperature |
float |
0.1 |
Sampling temperature |
Returns:
{
"response": str, # Generated text
"model_info": {
"model_id": str,
"provider": "bedrock",
},
"token_usage": {
"input_tokens": int,
"output_tokens": int,
"total_tokens": int,
},
}
get_service_name¶
Returns "aws_bedrock".
LLMServiceFactory¶
Methods¶
create¶
@staticmethod
def create(
service_type: str,
config: Optional[Dict[str, Any]] = None,
) -> LLMServiceInterface
Create an LLM service instance.
| Parameter | Type | Description |
|---|---|---|
service_type |
str |
Service type ("bedrock") |
config |
Dict |
Service configuration |
get_llm_service¶
Get or create an LLM service instance.
invoke_claude¶
def invoke_claude(
prompt: str,
system_message: Optional[str] = None,
max_tokens: int = 4096,
temperature: float = 0.1,
**kwargs,
) -> Dict[str, Any]
Convenience function for Claude invocation.
search_knowledge_base¶
def search_knowledge_base(
query: str,
knowledge_base_id: str,
max_results: int = 10,
**kwargs,
) -> List[SearchResult]
Search a Bedrock knowledge base.
DynamoDB Client¶
DynamoDBClient¶
A reusable DynamoDB client that provides a clean interface for common database operations including create, read, update, and delete (CRUD) operations with support for batch operations, pagination, and conditional writes.
Constructor¶
DynamoDBClient(
table_name: str,
region_name: Optional[str] = None,
endpoint_url: Optional[str] = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
table_name |
str |
- | Name of the DynamoDB table |
region_name |
str |
None |
AWS region (defaults to AWS_REGION env var or ap-southeast-2) |
endpoint_url |
str |
None |
Custom endpoint URL (for local development) |
Methods¶
get¶
def get(
self,
partition_key: str,
partition_value: Any,
sort_key: Optional[str] = None,
sort_value: Optional[Any] = None,
consistent_read: bool = False,
projection: Optional[List[str]] = None,
) -> Optional[Dict[str, Any]]
Fetch a single item by primary key.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
partition_key |
str |
- | Name of the partition key attribute |
partition_value |
Any |
- | Value of the partition key |
sort_key |
str |
None |
Name of the sort key attribute (if table has one) |
sort_value |
Any |
None |
Value of the sort key |
consistent_read |
bool |
False |
If True, uses strongly consistent read |
projection |
List[str] |
None |
List of attributes to return (None = all) |
Returns: Item dict or None if not found
Usage Examples:
from akordi_agents.config.dynamodb_client import DynamoDBClient
# Initialize the client
client = DynamoDBClient(table_name="my-table")
# Get item with partition key only
item = client.get(
partition_key="pk",
partition_value="user#123"
)
# Get item with partition key and sort key
item = client.get(
partition_key="pk",
partition_value="user#123",
sort_key="sk",
sort_value="profile"
)
# Get item with strongly consistent read
item = client.get(
partition_key="pk",
partition_value="user#123",
consistent_read=True
)
# Get only specific attributes (projection)
item = client.get(
partition_key="pk",
partition_value="user#123",
projection=["name", "email", "created_at"]
)
# Handle the response
if item:
print(f"Found item: {item}")
else:
print("Item not found")
create¶
def create(
self,
item: Dict[str, Any],
condition_expression: Optional[str] = None,
prevent_overwrite_key: Optional[str] = None,
) -> Tuple[bool, Optional[Dict[str, Any]]]
Create a new item in the table.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
item |
Dict[str, Any] |
- | Dictionary containing the item data (must include primary key) |
condition_expression |
str |
None |
Optional condition for conditional writes |
prevent_overwrite_key |
str |
None |
If set, prevents overwriting existing item with this key |
Returns: Tuple of (success: bool, created_item: dict or None)
Usage Example:
# Create a new item
success, item = client.create(
item={
"pk": "user#123",
"sk": "profile",
"name": "John Doe",
"email": "john@example.com"
},
prevent_overwrite_key="pk" # Prevent overwriting existing items
)
query¶
def query(
self,
partition_key: str,
partition_value: Any,
sort_key_condition: Optional[Tuple[str, str, Any]] = None,
filter_expression: Optional[Any] = None,
index_name: Optional[str] = None,
limit: Optional[int] = None,
scan_forward: bool = True,
projection: Optional[List[str]] = None,
exclusive_start_key: Optional[Dict] = None,
) -> Dict[str, Any]
Query items by partition key with optional sort key conditions.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
partition_key |
str |
- | Name of the partition key |
partition_value |
Any |
- | Value to match |
sort_key_condition |
Tuple |
None |
Tuple of (sort_key_name, operator, value). Operators: "eq", "lt", "lte", "gt", "gte", "between", "begins_with" |
filter_expression |
Any |
None |
Additional filter (applied after query) |
index_name |
str |
None |
GSI or LSI name |
limit |
int |
None |
Maximum items to return |
scan_forward |
bool |
True |
True for ascending, False for descending |
projection |
List[str] |
None |
Attributes to return |
exclusive_start_key |
Dict |
None |
Pagination cursor |
Returns: Dict with items, count, scanned_count, last_key (for pagination)
Usage Example:
# Query all items for a user
result = client.query(
partition_key="pk",
partition_value="user#123"
)
# Query with sort key condition
result = client.query(
partition_key="pk",
partition_value="user#123",
sort_key_condition=("sk", "begins_with", "order#")
)
# Query using a GSI
result = client.query(
partition_key="email",
partition_value="john@example.com",
index_name="email-index"
)
for item in result["items"]:
print(item)
update¶
def update(
self,
key: Dict[str, Any],
updates: Dict[str, Any],
condition_expression: Optional[str] = None,
return_values: str = "UPDATED_NEW",
) -> Tuple[bool, Optional[Dict[str, Any]]]
Update an existing item.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
Dict[str, Any] |
- | Primary key dict, e.g., {"pk": "user#123", "sk": "profile"} |
updates |
Dict[str, Any] |
- | Dict of attributes to update |
condition_expression |
str |
None |
Optional condition for conditional update |
return_values |
str |
"UPDATED_NEW" |
What to return ("NONE", "UPDATED_OLD", "UPDATED_NEW", "ALL_OLD", "ALL_NEW") |
Returns: Tuple of (success, returned_attributes)
Usage Example:
success, attrs = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={"name": "Jane Doe", "updated_at": "2024-01-15"}
)
delete¶
def delete(
self,
key: Dict[str, Any],
condition_expression: Optional[str] = None,
return_old: bool = False,
) -> Tuple[bool, Optional[Dict[str, Any]]]
Delete an item by primary key.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
Dict[str, Any] |
- | Primary key dict |
condition_expression |
str |
None |
Optional condition for conditional delete |
return_old |
bool |
False |
If True, returns the deleted item |
Returns: Tuple of (success, old_item or None)
Usage Example:
batch_create¶
Batch write multiple items (max 25 per batch).
Returns: Tuple of (successful_count, failed_count)
batch_delete¶
Batch delete multiple items.
Returns: Tuple of (successful_count, failed_count)
increment¶
def increment(
self,
key: Dict[str, Any],
attribute: str,
amount: int = 1,
) -> Tuple[bool, Optional[int]]
Atomically increment a numeric attribute.
Returns: Tuple of (success, new_value)
exists¶
def exists(
self,
partition_key: str,
partition_value: Any,
sort_key: Optional[str] = None,
sort_value: Optional[Any] = None,
) -> bool
Check if an item exists.
count¶
def count(
self,
partition_key: Optional[str] = None,
partition_value: Optional[Any] = None,
filter_expression: Optional[Any] = None,
) -> int
Count items matching criteria. Uses query if partition key provided, otherwise scans.
:octicons-arrow-right-24: Full DynamoDB Client Documentation - Comprehensive guide with all methods, pagination, error handling, best practices, and local development setup.
Chat History Service¶
ChatHistoryService¶
Constructor¶
ChatHistoryService(
sessions_table: Optional[str] = None,
messages_table: Optional[str] = None,
region_name: Optional[str] = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
sessions_table |
str |
None |
DynamoDB sessions table |
messages_table |
str |
None |
DynamoDB messages table |
region_name |
str |
None |
AWS region |
Uses environment variables if not provided:
- CHAT_SESSIONS_TABLE_NAME
- CHAT_MESSAGES_TABLE_NAME
Methods¶
create_chat_session¶
def create_chat_session(
self,
user_id: str,
chat_id: Optional[str] = None,
title: Optional[str] = None,
context_files: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> ChatSession
Create a new chat session.
Returns: ChatSession object
get_chat_session¶
Get a chat session by ID.
list_sessions¶
List user's chat sessions.
add_message¶
def add_message(
self,
chat_id: str,
actor: str,
message: str,
metadata: Optional[Dict[str, Any]] = None,
) -> ChatMessage
Add a message to a chat session.
Parameters:
| Parameter | Type | Description |
|---|---|---|
chat_id |
str |
Chat session ID |
actor |
str |
"user", "assistant", or "system" |
message |
str |
Message content |
metadata |
Dict |
Additional metadata |
get_chat_history¶
Get messages for a chat session.
delete_session¶
Delete a chat session and its messages.
Format Conversion Functions¶
Convert legacy format to ChatMessage objects.
Convert ChatMessage objects to legacy format.
Model Service¶
ModelService¶
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
region_name |
str |
None |
AWS region (defaults to AWS_REGION env var) |
Environment Variables:
| Variable | Description |
|---|---|
AKORDI_LLM_MODELS_TABLE |
DynamoDB table name for model information |
AWS_REGION |
AWS region |
Methods¶
create_model¶
Create a new model record in DynamoDB.
get_model¶
Retrieve a model by ID.
update_model¶
Update a model record.
delete_model¶
Delete a model record.
list_models¶
List all models, optionally filtered by status.
| Parameter | Type | Default | Description |
|---|---|---|---|
filter_status |
bool |
None |
Filter by active status (True=active, False=inactive, None=all) |
supported_resources¶
Get list of supported model resources (ARNs) for active models only.
get_supported_model_ids¶
Get list of supported model IDs for active models only.
get_model_pricing¶
Look up input/output token pricing for a model by its ARN.
| Parameter | Type | Description |
|---|---|---|
model_arn |
str |
The model ARN or identifier |
Returns:
{
"input_token_price": Decimal, # Price per input token
"output_token_price": Decimal, # Price per output token
}
Returns None if no pricing found.
Usage Example:
from akordi_agents.services import ModelService
model_service = ModelService()
# Get pricing for a model
pricing = model_service.get_model_pricing(
"arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-sonnet"
)
if pricing:
print(f"Input price: ${pricing['input_token_price']}")
print(f"Output price: ${pricing['output_token_price']}")
Token Usage Service¶
TokenUsageService¶
Constructor¶
Uses AKORDI_TOKEN_USAGE_TABLE environment variable if not provided.
Methods¶
initialize_agent_usage¶
def initialize_agent_usage(
self,
agent_id: str,
model_id: str,
metadata: Optional[Dict[str, Any]] = None,
) -> None
Initialize tracking for an agent.
record_query_usage¶
def record_query_usage(
self,
agent_id: str,
query_text: str,
input_tokens: int,
output_tokens: int,
user_id: Optional[str] = None,
chat_id: Optional[str] = None,
tools_used: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None
Record token usage for a query.
get_agent_usage¶
Get usage statistics for an agent.
Returns:
@dataclass
class AgentUsageStats:
agent_id: str
total_token_usage: int
number_of_llm_calls: int
model_id: str
status: str
last_updated: str
get_query_history¶
Get query history for an agent.
update_agent_status¶
Update agent status.
Prompt Service¶
PromptManager¶
Methods¶
get_prompt¶
Get a prompt for a persona.
register_prompt¶
Register a prompt template.
PromptFactory¶
Methods¶
register¶
Register a prompt builder.
get_prompt¶
Get a prompt using a registered builder.
PromptBuilder¶
Methods¶
build¶
Build a prompt from context. Override in subclasses.
Built-in Prompt Builders¶
class GenericChatPromptBuilder(PromptBuilder):
"""Generic chat prompt builder."""
class ConstructionChatPromptBuilder(PromptBuilder):
"""Construction domain prompt builder."""
Helper Functions¶
Get the global prompt manager instance.
Global prompt manager instance.
Search Handlers¶
AWSBedrockSearchHandler¶
class AWSBedrockSearchHandler(SearchHandlerInterface):
"""Search handler for AWS Bedrock Knowledge Base."""
Constructor¶
Methods¶
search¶
def search(
self,
query: str,
max_results: int = 10,
min_score: float = 0.0,
**kwargs,
) -> List[SearchResult]
Search the knowledge base.
get_handler_name¶
Returns "aws_bedrock_search".
AzureSearchHandler¶
Constructor¶
create_search_handler¶
Create a search handler by type.
get_search_handler¶
Get or create a search handler.
Agent Utilities¶
Utility functions for fetching agent configurations and building agents from DynamoDB.
get_system_prompt¶
from akordi_agents.utils.agent import get_system_prompt
def get_system_prompt(
agent_code: Optional[str] = None,
user_persona: str = "Construction Project Manager",
) -> Optional[str]
Fetch a system prompt template from DynamoDB with caching.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_code |
str |
None |
The agent/prompt code to look up |
user_persona |
str |
"Construction Project Manager" |
User persona (used as part of cache key) |
Returns: The prompt template string, or None if not found/configured.
Example:
from akordi_agents.utils.agent import get_system_prompt
# Get system prompt for an agent
prompt = get_system_prompt(agent_code="AP-001")
if prompt:
print(f"System prompt: {prompt}")
get_agent_by_code¶
from akordi_agents.utils.agent import get_agent_by_code
def get_agent_by_code(agent_code: str) -> Dict[str, Any]
Fetch an active agent record from AKORDI_AGENT_TABLE by its id.
| Parameter | Type | Description |
|---|---|---|
agent_code |
str |
The agent id (partition key) in DynamoDB |
Returns: The agent record dict.
Raises:
ValueError: If agent is not found or not activeKeyError: IfAKORDI_AGENT_TABLEenvironment variable is not set
Example:
from akordi_agents.utils.agent import get_agent_by_code
# Get agent configuration
agent_config = get_agent_by_code("my-agent-001")
print(f"Agent name: {agent_config['name']}")
print(f"Model: {agent_config['model']}")
get_agent¶
from akordi_agents.utils.agent import get_agent
def get_agent(
agent_code: str,
llm_service: Optional[LLMServiceInterface] = None,
data_model: Optional[DataModelInterface] = None,
) -> CustomAgent
Build and return a CustomAgent from a DynamoDB agent record.
Fetches the agent configuration (including model_id) from AKORDI_AGENT_TABLE, creates an LLM service if not provided, and assembles the agent via AgentBuilder.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_code |
str |
- | The agent id in AKORDI_AGENT_TABLE |
llm_service |
LLMServiceInterface |
None |
Pre-built LLM service instance. If None, a default AWSBedrockService-backed service is created |
data_model |
DataModelInterface |
None |
Optional data model for request/response validation |
Returns: A configured CustomAgent instance.
Raises: ValueError if agent record is missing a model field.
Example:
from akordi_agents.utils.agent import get_agent
# Build agent from DynamoDB configuration
agent = get_agent("my-agent-001")
# Process a request
response = agent.process_request({
"query": "What is the project status?",
"system_message": "You are a helpful assistant.",
})
print(response["llm_response"]["response"])
With custom LLM service:
from akordi_agents.utils.agent import get_agent
class MyCustomLLMService(LLMServiceInterface):
def generate_response(self, prompt, context=None, **kwargs):
# Custom implementation
return {"response": "...", "model_info": {}, "token_usage": {}}
def get_service_name(self):
return "my_custom_service"
# Use custom LLM service
agent = get_agent("my-agent-001", llm_service=MyCustomLLMService())
Environment Variables:
| Variable | Description |
|---|---|
AKORDI_AGENT_TABLE |
DynamoDB table name for agent configurations |
AWS_REGION |
AWS region for DynamoDB and Bedrock |