DynamoDB Client¶
A comprehensive, reusable DynamoDB client that provides a clean interface for common database operations.
Overview¶
The DynamoDBClient class is a wrapper around AWS boto3's DynamoDB resource that simplifies common operations:
- CRUD Operations: Create, Read, Update, Delete with consistent return types
- Batch Operations: Efficient bulk writes and deletes
- Pagination: Built-in support for paginated queries and scans
- Conditional Writes: Prevent overwrites and implement optimistic locking
- Type Serialization: Automatic conversion between Python and DynamoDB types
Installation¶
The DynamoDB client is included in the Akordi Agents SDK:
Prerequisites¶
- AWS credentials configured (via environment variables, AWS CLI, or IAM role)
- Appropriate IAM permissions for DynamoDB operations
Required IAM permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchWriteItem",
"dynamodb:DescribeTable"
],
"Resource": "arn:aws:dynamodb:*:*:table/your-table-name*"
}
]
}
Quick Start¶
from akordi_agents.config.dynamodb_client import DynamoDBClient
# Initialize the client
client = DynamoDBClient(table_name="my-table")
# Create an item
success, item = client.create(
item={
"pk": "user#123",
"sk": "profile",
"name": "John Doe",
"email": "john@example.com"
},
prevent_overwrite_key="pk"
)
# Get an item
item = client.get(
partition_key="pk",
partition_value="user#123",
sort_key="sk",
sort_value="profile"
)
# Update an item
success, attrs = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={"name": "Jane Doe"}
)
# Delete an item
success, old_item = client.delete(
key={"pk": "user#123", "sk": "profile"},
return_old=True
)
API Reference¶
Constructor¶
DynamoDBClient(
table_name: str,
region_name: Optional[str] = None,
endpoint_url: Optional[str] = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
table_name |
str |
Required | Name of the DynamoDB table |
region_name |
str |
None |
AWS region. Defaults to AWS_REGION env var or ap-southeast-2 |
endpoint_url |
str |
None |
Custom endpoint URL for local development (e.g., DynamoDB Local) |
Example - Local Development:
# Connect to DynamoDB Local
client = DynamoDBClient(
table_name="my-table",
endpoint_url="http://localhost:8000"
)
Create Operations¶
create¶
Create a new item in the table.
def create(
self,
item: Dict[str, Any],
condition_expression: Optional[str] = None,
prevent_overwrite_key: Optional[str] = None,
) -> Tuple[bool, Optional[Dict[str, Any]]]
| Parameter | Type | Default | Description |
|---|---|---|---|
item |
Dict[str, Any] |
Required | Item data including primary key |
condition_expression |
str |
None |
DynamoDB condition expression |
prevent_overwrite_key |
str |
None |
Key attribute to check for existence |
Returns: Tuple[bool, Optional[Dict]] - (success, created_item or None)
Examples:
# Basic create
success, item = client.create(
item={
"pk": "user#123",
"sk": "profile",
"name": "John Doe",
"email": "john@example.com",
"created_at": "2024-01-15T10:30:00Z"
}
)
# Prevent overwriting existing items
success, item = client.create(
item={"pk": "user#123", "sk": "profile", "name": "John"},
prevent_overwrite_key="pk"
)
if not success:
print("Item already exists!")
# With custom condition expression
success, item = client.create(
item={"pk": "user#123", "sk": "profile", "version": 1},
condition_expression="attribute_not_exists(pk) OR version < :v",
)
batch_create¶
Batch write multiple items efficiently (max 25 per batch, handled automatically).
| Parameter | Type | Default | Description |
|---|---|---|---|
items |
List[Dict] |
Required | List of items to create |
batch_size |
int |
25 |
Items per batch (max 25) |
Returns: Tuple[int, int] - (successful_count, failed_count)
Example:
items = [
{"pk": f"user#{i}", "sk": "profile", "name": f"User {i}"}
for i in range(100)
]
successful, failed = client.batch_create(items)
print(f"Created {successful} items, {failed} failed")
Read Operations¶
get¶
Fetch a single item by primary key.
def get(
self,
partition_key: str,
partition_value: Any,
sort_key: Optional[str] = None,
sort_value: Optional[Any] = None,
consistent_read: bool = False,
projection: Optional[List[str]] = None,
) -> Optional[Dict[str, Any]]
| Parameter | Type | Default | Description |
|---|---|---|---|
partition_key |
str |
Required | Name of partition key attribute |
partition_value |
Any |
Required | Value of partition key |
sort_key |
str |
None |
Name of sort key attribute |
sort_value |
Any |
None |
Value of sort key |
consistent_read |
bool |
False |
Use strongly consistent read |
projection |
List[str] |
None |
Attributes to return (None = all) |
Returns: Optional[Dict] - Item dict or None if not found
Examples:
# Get with partition key only (table without sort key)
item = client.get(
partition_key="id",
partition_value="item-123"
)
# Get with partition key and sort key
item = client.get(
partition_key="pk",
partition_value="user#123",
sort_key="sk",
sort_value="profile"
)
# Strongly consistent read (for critical operations)
item = client.get(
partition_key="pk",
partition_value="user#123",
sort_key="sk",
sort_value="profile",
consistent_read=True
)
# Fetch only specific attributes (reduces read costs)
item = client.get(
partition_key="pk",
partition_value="user#123",
projection=["name", "email"]
)
# Handle missing items
if item:
print(f"Found: {item['name']}")
else:
print("Item not found")
query¶
Query items by partition key with optional sort key conditions.
def query(
self,
partition_key: str,
partition_value: Any,
sort_key_condition: Optional[Tuple[str, str, Any]] = None,
filter_expression: Optional[Any] = None,
index_name: Optional[str] = None,
limit: Optional[int] = None,
scan_forward: bool = True,
projection: Optional[List[str]] = None,
exclusive_start_key: Optional[Dict] = None,
) -> Dict[str, Any]
| Parameter | Type | Default | Description |
|---|---|---|---|
partition_key |
str |
Required | Partition key name |
partition_value |
Any |
Required | Partition key value |
sort_key_condition |
Tuple |
None |
(sk_name, operator, value) |
filter_expression |
Any |
None |
Post-query filter |
index_name |
str |
None |
GSI or LSI name |
limit |
int |
None |
Max items to return |
scan_forward |
bool |
True |
True for ascending, False for descending |
projection |
List[str] |
None |
Attributes to return |
exclusive_start_key |
Dict |
None |
Pagination cursor |
Sort Key Operators:
| Operator | Description | Example Value |
|---|---|---|
eq |
Equal | "profile" |
lt |
Less than | "2024-01-01" |
lte |
Less than or equal | "2024-01-01" |
gt |
Greater than | "2024-01-01" |
gte |
Greater than or equal | "2024-01-01" |
begins_with |
Starts with | "order#" |
between |
Range | ("2024-01-01", "2024-12-31") |
Returns:
{
"items": List[Dict], # Matching items
"count": int, # Number of items returned
"scanned_count": int, # Items scanned (before filter)
"last_key": Optional[Dict] # Pagination cursor (None if no more)
}
Examples:
# Get all items for a partition key
result = client.query(
partition_key="pk",
partition_value="user#123"
)
print(f"Found {result['count']} items")
# Query with sort key condition
result = client.query(
partition_key="pk",
partition_value="user#123",
sort_key_condition=("sk", "begins_with", "order#")
)
# Query items in a date range
result = client.query(
partition_key="pk",
partition_value="user#123",
sort_key_condition=("created_at", "between", ("2024-01-01", "2024-06-30"))
)
# Get most recent items first (descending)
result = client.query(
partition_key="pk",
partition_value="user#123",
scan_forward=False,
limit=10
)
# Query a Global Secondary Index (GSI)
result = client.query(
partition_key="email",
partition_value="john@example.com",
index_name="email-index"
)
# Filter results after query
from boto3.dynamodb.conditions import Attr
result = client.query(
partition_key="pk",
partition_value="user#123",
filter_expression=Attr("status").eq("active")
)
# Manual pagination
last_key = None
all_items = []
while True:
result = client.query(
partition_key="pk",
partition_value="user#123",
limit=100,
exclusive_start_key=last_key
)
all_items.extend(result["items"])
last_key = result["last_key"]
if not last_key:
break
query_paginated¶
Generator that yields pages of query results.
def query_paginated(
self,
partition_key: str,
partition_value: Any,
page_size: int = 100,
**kwargs,
) -> Generator[Dict[str, Any], None, None]
Yields:
Example:
# Process all items in pages
for page in client.query_paginated(
partition_key="pk",
partition_value="user#123",
page_size=50
):
print(f"Processing page {page['page_number']} with {page['count']} items")
for item in page["items"]:
process_item(item)
scan¶
Scan the entire table. Use sparingly - this is an expensive operation.
def scan(
self,
filter_expression: Optional[Any] = None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
exclusive_start_key: Optional[Dict] = None,
index_name: Optional[str] = None,
) -> Dict[str, Any]
Returns: Same structure as query()
Example:
from boto3.dynamodb.conditions import Attr
# Scan with filter
result = client.scan(
filter_expression=Attr("status").eq("inactive")
)
# Scan a specific index
result = client.scan(
index_name="status-index",
filter_expression=Attr("last_login").lt("2023-01-01")
)
scan_paginated¶
Generator that yields pages of scan results.
def scan_paginated(
self,
page_size: int = 100,
max_pages: Optional[int] = None,
**kwargs,
) -> Generator[Dict[str, Any], None, None]
| Parameter | Type | Default | Description |
|---|---|---|---|
page_size |
int |
100 |
Items per page |
max_pages |
int |
None |
Maximum pages to return |
Example:
# Scan first 5 pages only
for page in client.scan_paginated(page_size=100, max_pages=5):
print(f"Page {page['page_number']}: {page['count']} items")
get_all¶
Fetch all items from table (handles pagination internally).
def get_all(
self,
filter_expression: Optional[Any] = None,
max_items: Optional[int] = None,
) -> List[Dict[str, Any]]
Warning: Use with caution on large tables!
Example:
# Get all items (careful with large tables!)
all_items = client.get_all()
# Get up to 1000 items
items = client.get_all(max_items=1000)
# Get all items matching a filter
from boto3.dynamodb.conditions import Attr
active_users = client.get_all(
filter_expression=Attr("status").eq("active"),
max_items=500
)
Update Operations¶
update¶
Update an existing item.
def update(
self,
key: Dict[str, Any],
updates: Dict[str, Any],
condition_expression: Optional[str] = None,
return_values: str = "UPDATED_NEW",
) -> Tuple[bool, Optional[Dict[str, Any]]]
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
Dict |
Required | Primary key dict |
updates |
Dict |
Required | Attributes to update |
condition_expression |
str |
None |
Conditional update expression |
return_values |
str |
"UPDATED_NEW" |
What to return |
Return Value Options:
| Value | Description |
|---|---|
"NONE" |
Return nothing |
"UPDATED_OLD" |
Old values of updated attributes |
"UPDATED_NEW" |
New values of updated attributes |
"ALL_OLD" |
All old attributes |
"ALL_NEW" |
All new attributes |
Examples:
# Simple update
success, attrs = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={
"name": "Jane Doe",
"updated_at": "2024-01-15T10:30:00Z"
}
)
# Conditional update (optimistic locking)
success, attrs = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={"balance": 150, "version": 2},
condition_expression="version = :expected_version",
)
if not success:
print("Update failed - item was modified by another process")
# Get all attributes after update
success, all_attrs = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={"last_login": "2024-01-15"},
return_values="ALL_NEW"
)
increment¶
Atomically increment a numeric attribute.
def increment(
self,
key: Dict[str, Any],
attribute: str,
amount: int = 1,
) -> Tuple[bool, Optional[int]]
Returns: Tuple[bool, Optional[int]] - (success, new_value)
Examples:
# Increment view count
success, new_count = client.increment(
key={"pk": "article#456", "sk": "metadata"},
attribute="view_count"
)
print(f"New view count: {new_count}")
# Decrement (negative amount)
success, new_count = client.increment(
key={"pk": "product#789", "sk": "inventory"},
attribute="stock",
amount=-1
)
# Increment by specific amount
success, new_count = client.increment(
key={"pk": "user#123", "sk": "stats"},
attribute="points",
amount=100
)
append_to_list¶
Append values to a list attribute.
def append_to_list(
self,
key: Dict[str, Any],
attribute: str,
values: List[Any],
) -> Tuple[bool, Optional[List]]
Returns: Tuple[bool, Optional[List]] - (success, updated_list)
Example:
# Add tags to an item
success, tags = client.append_to_list(
key={"pk": "article#456", "sk": "metadata"},
attribute="tags",
values=["python", "aws"]
)
# Add to activity log
success, log = client.append_to_list(
key={"pk": "user#123", "sk": "activity"},
attribute="events",
values=[{"action": "login", "timestamp": "2024-01-15T10:30:00Z"}]
)
Delete Operations¶
delete¶
Delete an item by primary key.
def delete(
self,
key: Dict[str, Any],
condition_expression: Optional[str] = None,
return_old: bool = False,
) -> Tuple[bool, Optional[Dict[str, Any]]]
| Parameter | Type | Default | Description |
|---|---|---|---|
key |
Dict |
Required | Primary key dict |
condition_expression |
str |
None |
Conditional delete |
return_old |
bool |
False |
Return deleted item |
Examples:
# Simple delete
success, _ = client.delete(
key={"pk": "user#123", "sk": "profile"}
)
# Delete and return old item
success, old_item = client.delete(
key={"pk": "user#123", "sk": "profile"},
return_old=True
)
if success and old_item:
print(f"Deleted user: {old_item['name']}")
# Conditional delete
success, _ = client.delete(
key={"pk": "user#123", "sk": "profile"},
condition_expression="attribute_exists(pk) AND status = :status",
)
batch_delete¶
Batch delete multiple items efficiently.
Returns: Tuple[int, int] - (successful_count, failed_count)
Example:
# Delete multiple items
keys_to_delete = [
{"pk": "user#1", "sk": "profile"},
{"pk": "user#2", "sk": "profile"},
{"pk": "user#3", "sk": "profile"},
]
successful, failed = client.batch_delete(keys_to_delete)
print(f"Deleted {successful}, failed {failed}")
Utility Methods¶
exists¶
Check if an item exists.
def exists(
self,
partition_key: str,
partition_value: Any,
sort_key: Optional[str] = None,
sort_value: Optional[Any] = None,
) -> bool
Example:
if client.exists(
partition_key="pk",
partition_value="user#123",
sort_key="sk",
sort_value="profile"
):
print("User exists!")
else:
print("User not found")
count¶
Count items matching criteria.
def count(
self,
partition_key: Optional[str] = None,
partition_value: Optional[Any] = None,
filter_expression: Optional[Any] = None,
) -> int
Examples:
# Count items for a partition key (efficient - uses query)
user_items = client.count(
partition_key="pk",
partition_value="user#123"
)
# Count all items in table (expensive - uses scan)
total_items = client.count()
# Count with filter
from boto3.dynamodb.conditions import Attr
active_count = client.count(
partition_key="pk",
partition_value="user#123",
filter_expression=Attr("status").eq("active")
)
get_table_info¶
Get table metadata (cached after first call).
Returns:
{
"name": str, # Table name
"status": str, # e.g., "ACTIVE"
"item_count": int, # Approximate item count
"key_schema": Dict[str, str], # {attribute_name: key_type}
"creation_date": str # ISO timestamp
}
Example:
info = client.get_table_info()
print(f"Table: {info['name']}")
print(f"Status: {info['status']}")
print(f"Items: {info['item_count']}")
print(f"Key Schema: {info['key_schema']}")
Type Serialization¶
The client automatically handles type conversion between Python and DynamoDB:
| Python Type | DynamoDB Type | Notes |
|---|---|---|
str |
S (String) |
Direct mapping |
int |
N (Number) |
Direct mapping |
float |
N (Number) |
Converted to Decimal |
bool |
BOOL |
Direct mapping |
None |
NULL |
Direct mapping |
list |
L (List) |
Recursively serialized |
dict |
M (Map) |
Recursively serialized |
Decimal |
N (Number) |
Converted back to int/float |
Example:
# Python types are automatically converted
item = {
"pk": "user#123",
"name": "John", # str -> S
"age": 30, # int -> N
"balance": 99.99, # float -> N (Decimal)
"active": True, # bool -> BOOL
"tags": ["python", "aws"], # list -> L
"metadata": {"key": "value"}, # dict -> M
}
client.create(item=item)
# When reading, types are converted back
retrieved = client.get(partition_key="pk", partition_value="user#123")
print(type(retrieved["balance"])) # float
print(type(retrieved["age"])) # int
Error Handling¶
The client handles errors gracefully and returns consistent result types:
# Create operations return (bool, item_or_none)
success, item = client.create(item={...})
if not success:
# Handle error (logged automatically)
pass
# Get operations return item or None
item = client.get(partition_key="pk", partition_value="missing")
if item is None:
print("Not found or error occurred")
# Query/Scan operations return empty results on error
result = client.query(partition_key="pk", partition_value="user#123")
if result["count"] == 0:
print("No items found (or error occurred)")
Common Error Scenarios¶
| Scenario | Behavior |
|---|---|
| Item not found | Returns None |
| Conditional check failed | Returns (False, None) |
| Permission denied | Returns error result, logs error |
| Network error | Returns error result, logs error |
| Table not found | Returns error result, logs error |
Local Development¶
Use DynamoDB Local for development and testing:
# Run DynamoDB Local with Docker
docker run -d -p 8000:8000 amazon/dynamodb-local
# Create table
aws dynamodb create-table \
--table-name my-table \
--attribute-definitions AttributeName=pk,AttributeType=S AttributeName=sk,AttributeType=S \
--key-schema AttributeName=pk,KeyType=HASH AttributeName=sk,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST \
--endpoint-url http://localhost:8000
# Connect to local DynamoDB
client = DynamoDBClient(
table_name="my-table",
endpoint_url="http://localhost:8000"
)
Real-World Examples¶
Querying Agent Registry by Team Code¶
This example demonstrates how to query an agent registry table to retrieve all agents belonging to a specific team. The table name is loaded from an environment variable.
Table Structure:
The AKORDI_AGENT_TABLE stores agent configurations with:
team_code(partition key) - Team identifier (e.g., "AT-001")id(sort key) - Agent identifier (e.g., "AG-001")
Example Code:
import os
from akordi_agents.config.dynamodb_client import DynamoDBClient
# Get table name from environment variable
table_name = os.getenv("AKORDI_AGENT_TABLE")
# Initialize the client
client = DynamoDBClient(table_name=table_name)
# Query all agents for a specific team
result = client.query(
partition_key="team_code",
partition_value="AT-001"
)
# Process the results
agents = result["items"]
print(f"Found {result['count']} agents for team AT-001")
for agent in agents:
print(f"Agent: {agent['id']} - {agent['name']}")
print(f" Type: {agent['Type']}")
print(f" Active: {agent['active']}")
print(f" Lambda: {agent['lambda_arn']}")
print(f" Skills: {agent['capability']['skills']}")
Expected Response Structure:
The query returns items with the following structure (already deserialized from DynamoDB format):
{
"items": [
{
"id": "AG-001",
"active": True,
"agent_prompt_code": "AP-001",
"capability": {
"confidence_score": 0.85,
"domain": "documents",
"estimated_latency": 5,
"max_concurrent_tasks": 3,
"skills": [
"document_analysis",
"pdf_analysis",
"risk_extraction",
"file_analysis"
]
},
"createdAt": "2026-01-28T01:04:32.908Z",
"createdBy": "system",
"lambda_arn": "arn:aws:lambda:ap-southeast-2:216989112759:function:image-akordi-agent-api",
"maxTokens": 4000,
"name": "document_analysis",
"request_body": {
"agent_code": {"from": "agent_prompt_code"},
"bucket_name": {"default": "", "from": "bucket_name"},
"chat-history": {"default": [], "from": "chat_history"},
"file_keys": {"default": [], "from": "file_keys"},
"kb_id": {"default": "", "from": "kb_id"},
"query": {"from": "query"}
},
"request_type": "wrapped",
"team_code": "AT-001",
"Type": "Reasoning Agent",
"version": "0.0.1"
}
# ... more agents
],
"count": 1,
"scanned_count": 1,
"last_key": None # None indicates no more pages
}
Filtering Active Agents Only:
from boto3.dynamodb.conditions import Attr
# Query only active agents for a team
result = client.query(
partition_key="team_code",
partition_value="AT-001",
filter_expression=Attr("active").eq(True)
)
active_agents = result["items"]
Getting a Specific Agent:
# Get a specific agent by team_code and id
agent = client.get(
partition_key="team_code",
partition_value="AT-001",
sort_key="id",
sort_value="AG-001"
)
if agent:
print(f"Found agent: {agent['name']}")
print(f"Skills: {agent['capability']['skills']}")
else:
print("Agent not found")
Querying with Projections (Fetch Only Needed Fields):
# Get only essential fields to reduce read costs
result = client.query(
partition_key="team_code",
partition_value="AT-001",
projection=["id", "name", "active", "Type", "lambda_arn"]
)
for agent in result["items"]:
print(f"{agent['id']}: {agent['name']} ({agent['Type']}) - Active: {agent['active']}")
Using a GSI to Query by Agent Type:
If the table has a Global Secondary Index (GSI) on Type:
# Query all "Reasoning Agent" types across teams
result = client.query(
partition_key="Type",
partition_value="Reasoning Agent",
index_name="type-index"
)
Best Practices¶
1. Use Partition Key Queries Over Scans¶
# Good - uses partition key (efficient)
result = client.query(
partition_key="pk",
partition_value="user#123"
)
# Avoid - scans entire table (expensive)
result = client.scan(
filter_expression=Attr("user_id").eq("user#123")
)
2. Use Projections to Reduce Read Costs¶
# Only fetch needed attributes
item = client.get(
partition_key="pk",
partition_value="user#123",
projection=["name", "email"]
)
3. Use Batch Operations for Bulk Writes¶
# Efficient - single batch operation
client.batch_create(items)
# Inefficient - multiple individual calls
for item in items:
client.create(item=item)
4. Use Pagination for Large Result Sets¶
# Good - process in pages
for page in client.query_paginated(
partition_key="pk",
partition_value="user#123",
page_size=100
):
process_page(page["items"])
# Avoid - loading all into memory
all_items = client.get_all() # May cause memory issues
5. Use Conditional Writes for Concurrency¶
# Implement optimistic locking
success, _ = client.update(
key={"pk": "user#123", "sk": "profile"},
updates={"balance": new_balance, "version": current_version + 1},
condition_expression="version = :v",
)
See Also¶
- DynamoDB Prompt Config - Managing prompts in DynamoDB
- Chat History Service - Chat persistence with DynamoDB
- AWS DynamoDB Documentation