Skip to content

DynamoDB Client

A comprehensive, reusable DynamoDB client that provides a clean interface for common database operations.

Overview

The DynamoDBClient class is a wrapper around AWS boto3's DynamoDB resource that simplifies common operations:

  • CRUD Operations: Create, Read, Update, Delete with consistent return types
  • Batch Operations: Efficient bulk writes and deletes
  • Pagination: Built-in support for paginated queries and scans
  • Conditional Writes: Prevent overwrites and implement optimistic locking
  • Type Serialization: Automatic conversion between Python and DynamoDB types

Installation

The DynamoDB client is included in the Akordi Agents SDK:

pip install akordi-agents

Prerequisites

  • AWS credentials configured (via environment variables, AWS CLI, or IAM role)
  • Appropriate IAM permissions for DynamoDB operations

Required IAM permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:BatchWriteItem",
                "dynamodb:DescribeTable"
            ],
            "Resource": "arn:aws:dynamodb:*:*:table/your-table-name*"
        }
    ]
}

Quick Start

from akordi_agents.config.dynamodb_client import DynamoDBClient

# Initialize the client
client = DynamoDBClient(table_name="my-table")

# Create an item
success, item = client.create(
    item={
        "pk": "user#123",
        "sk": "profile",
        "name": "John Doe",
        "email": "john@example.com"
    },
    prevent_overwrite_key="pk"
)

# Get an item
item = client.get(
    partition_key="pk",
    partition_value="user#123",
    sort_key="sk",
    sort_value="profile"
)

# Update an item
success, attrs = client.update(
    key={"pk": "user#123", "sk": "profile"},
    updates={"name": "Jane Doe"}
)

# Delete an item
success, old_item = client.delete(
    key={"pk": "user#123", "sk": "profile"},
    return_old=True
)

API Reference

Constructor

DynamoDBClient(
    table_name: str,
    region_name: Optional[str] = None,
    endpoint_url: Optional[str] = None,
)
Parameter Type Default Description
table_name str Required Name of the DynamoDB table
region_name str None AWS region. Defaults to AWS_REGION env var or ap-southeast-2
endpoint_url str None Custom endpoint URL for local development (e.g., DynamoDB Local)

Example - Local Development:

# Connect to DynamoDB Local
client = DynamoDBClient(
    table_name="my-table",
    endpoint_url="http://localhost:8000"
)

Create Operations

create

Create a new item in the table.

def create(
    self,
    item: Dict[str, Any],
    condition_expression: Optional[str] = None,
    prevent_overwrite_key: Optional[str] = None,
) -> Tuple[bool, Optional[Dict[str, Any]]]
Parameter Type Default Description
item Dict[str, Any] Required Item data including primary key
condition_expression str None DynamoDB condition expression
prevent_overwrite_key str None Key attribute to check for existence

Returns: Tuple[bool, Optional[Dict]] - (success, created_item or None)

Examples:

# Basic create
success, item = client.create(
    item={
        "pk": "user#123",
        "sk": "profile",
        "name": "John Doe",
        "email": "john@example.com",
        "created_at": "2024-01-15T10:30:00Z"
    }
)

# Prevent overwriting existing items
success, item = client.create(
    item={"pk": "user#123", "sk": "profile", "name": "John"},
    prevent_overwrite_key="pk"
)

if not success:
    print("Item already exists!")

# With custom condition expression
success, item = client.create(
    item={"pk": "user#123", "sk": "profile", "version": 1},
    condition_expression="attribute_not_exists(pk) OR version < :v",
)

batch_create

Batch write multiple items efficiently (max 25 per batch, handled automatically).

def batch_create(
    self,
    items: List[Dict[str, Any]],
    batch_size: int = 25,
) -> Tuple[int, int]
Parameter Type Default Description
items List[Dict] Required List of items to create
batch_size int 25 Items per batch (max 25)

Returns: Tuple[int, int] - (successful_count, failed_count)

Example:

items = [
    {"pk": f"user#{i}", "sk": "profile", "name": f"User {i}"}
    for i in range(100)
]

successful, failed = client.batch_create(items)
print(f"Created {successful} items, {failed} failed")

Read Operations

get

Fetch a single item by primary key.

def get(
    self,
    partition_key: str,
    partition_value: Any,
    sort_key: Optional[str] = None,
    sort_value: Optional[Any] = None,
    consistent_read: bool = False,
    projection: Optional[List[str]] = None,
) -> Optional[Dict[str, Any]]
Parameter Type Default Description
partition_key str Required Name of partition key attribute
partition_value Any Required Value of partition key
sort_key str None Name of sort key attribute
sort_value Any None Value of sort key
consistent_read bool False Use strongly consistent read
projection List[str] None Attributes to return (None = all)

Returns: Optional[Dict] - Item dict or None if not found

Examples:

# Get with partition key only (table without sort key)
item = client.get(
    partition_key="id",
    partition_value="item-123"
)

# Get with partition key and sort key
item = client.get(
    partition_key="pk",
    partition_value="user#123",
    sort_key="sk",
    sort_value="profile"
)

# Strongly consistent read (for critical operations)
item = client.get(
    partition_key="pk",
    partition_value="user#123",
    sort_key="sk",
    sort_value="profile",
    consistent_read=True
)

# Fetch only specific attributes (reduces read costs)
item = client.get(
    partition_key="pk",
    partition_value="user#123",
    projection=["name", "email"]
)

# Handle missing items
if item:
    print(f"Found: {item['name']}")
else:
    print("Item not found")

query

Query items by partition key with optional sort key conditions.

def query(
    self,
    partition_key: str,
    partition_value: Any,
    sort_key_condition: Optional[Tuple[str, str, Any]] = None,
    filter_expression: Optional[Any] = None,
    index_name: Optional[str] = None,
    limit: Optional[int] = None,
    scan_forward: bool = True,
    projection: Optional[List[str]] = None,
    exclusive_start_key: Optional[Dict] = None,
) -> Dict[str, Any]
Parameter Type Default Description
partition_key str Required Partition key name
partition_value Any Required Partition key value
sort_key_condition Tuple None (sk_name, operator, value)
filter_expression Any None Post-query filter
index_name str None GSI or LSI name
limit int None Max items to return
scan_forward bool True True for ascending, False for descending
projection List[str] None Attributes to return
exclusive_start_key Dict None Pagination cursor

Sort Key Operators:

Operator Description Example Value
eq Equal "profile"
lt Less than "2024-01-01"
lte Less than or equal "2024-01-01"
gt Greater than "2024-01-01"
gte Greater than or equal "2024-01-01"
begins_with Starts with "order#"
between Range ("2024-01-01", "2024-12-31")

Returns:

{
    "items": List[Dict],      # Matching items
    "count": int,             # Number of items returned
    "scanned_count": int,     # Items scanned (before filter)
    "last_key": Optional[Dict] # Pagination cursor (None if no more)
}

Examples:

# Get all items for a partition key
result = client.query(
    partition_key="pk",
    partition_value="user#123"
)
print(f"Found {result['count']} items")

# Query with sort key condition
result = client.query(
    partition_key="pk",
    partition_value="user#123",
    sort_key_condition=("sk", "begins_with", "order#")
)

# Query items in a date range
result = client.query(
    partition_key="pk",
    partition_value="user#123",
    sort_key_condition=("created_at", "between", ("2024-01-01", "2024-06-30"))
)

# Get most recent items first (descending)
result = client.query(
    partition_key="pk",
    partition_value="user#123",
    scan_forward=False,
    limit=10
)

# Query a Global Secondary Index (GSI)
result = client.query(
    partition_key="email",
    partition_value="john@example.com",
    index_name="email-index"
)

# Filter results after query
from boto3.dynamodb.conditions import Attr

result = client.query(
    partition_key="pk",
    partition_value="user#123",
    filter_expression=Attr("status").eq("active")
)

# Manual pagination
last_key = None
all_items = []

while True:
    result = client.query(
        partition_key="pk",
        partition_value="user#123",
        limit=100,
        exclusive_start_key=last_key
    )
    all_items.extend(result["items"])
    last_key = result["last_key"]
    if not last_key:
        break

query_paginated

Generator that yields pages of query results.

def query_paginated(
    self,
    partition_key: str,
    partition_value: Any,
    page_size: int = 100,
    **kwargs,
) -> Generator[Dict[str, Any], None, None]

Yields:

{
    "items": List[Dict],
    "count": int,
    "last_key": Optional[Dict],
    "page_number": int
}

Example:

# Process all items in pages
for page in client.query_paginated(
    partition_key="pk",
    partition_value="user#123",
    page_size=50
):
    print(f"Processing page {page['page_number']} with {page['count']} items")
    for item in page["items"]:
        process_item(item)

scan

Scan the entire table. Use sparingly - this is an expensive operation.

def scan(
    self,
    filter_expression: Optional[Any] = None,
    projection: Optional[List[str]] = None,
    limit: Optional[int] = None,
    exclusive_start_key: Optional[Dict] = None,
    index_name: Optional[str] = None,
) -> Dict[str, Any]

Returns: Same structure as query()

Example:

from boto3.dynamodb.conditions import Attr

# Scan with filter
result = client.scan(
    filter_expression=Attr("status").eq("inactive")
)

# Scan a specific index
result = client.scan(
    index_name="status-index",
    filter_expression=Attr("last_login").lt("2023-01-01")
)

scan_paginated

Generator that yields pages of scan results.

def scan_paginated(
    self,
    page_size: int = 100,
    max_pages: Optional[int] = None,
    **kwargs,
) -> Generator[Dict[str, Any], None, None]
Parameter Type Default Description
page_size int 100 Items per page
max_pages int None Maximum pages to return

Example:

# Scan first 5 pages only
for page in client.scan_paginated(page_size=100, max_pages=5):
    print(f"Page {page['page_number']}: {page['count']} items")

get_all

Fetch all items from table (handles pagination internally).

def get_all(
    self,
    filter_expression: Optional[Any] = None,
    max_items: Optional[int] = None,
) -> List[Dict[str, Any]]

Warning: Use with caution on large tables!

Example:

# Get all items (careful with large tables!)
all_items = client.get_all()

# Get up to 1000 items
items = client.get_all(max_items=1000)

# Get all items matching a filter
from boto3.dynamodb.conditions import Attr
active_users = client.get_all(
    filter_expression=Attr("status").eq("active"),
    max_items=500
)

Update Operations

update

Update an existing item.

def update(
    self,
    key: Dict[str, Any],
    updates: Dict[str, Any],
    condition_expression: Optional[str] = None,
    return_values: str = "UPDATED_NEW",
) -> Tuple[bool, Optional[Dict[str, Any]]]
Parameter Type Default Description
key Dict Required Primary key dict
updates Dict Required Attributes to update
condition_expression str None Conditional update expression
return_values str "UPDATED_NEW" What to return

Return Value Options:

Value Description
"NONE" Return nothing
"UPDATED_OLD" Old values of updated attributes
"UPDATED_NEW" New values of updated attributes
"ALL_OLD" All old attributes
"ALL_NEW" All new attributes

Examples:

# Simple update
success, attrs = client.update(
    key={"pk": "user#123", "sk": "profile"},
    updates={
        "name": "Jane Doe",
        "updated_at": "2024-01-15T10:30:00Z"
    }
)

# Conditional update (optimistic locking)
success, attrs = client.update(
    key={"pk": "user#123", "sk": "profile"},
    updates={"balance": 150, "version": 2},
    condition_expression="version = :expected_version",
)

if not success:
    print("Update failed - item was modified by another process")

# Get all attributes after update
success, all_attrs = client.update(
    key={"pk": "user#123", "sk": "profile"},
    updates={"last_login": "2024-01-15"},
    return_values="ALL_NEW"
)

increment

Atomically increment a numeric attribute.

def increment(
    self,
    key: Dict[str, Any],
    attribute: str,
    amount: int = 1,
) -> Tuple[bool, Optional[int]]

Returns: Tuple[bool, Optional[int]] - (success, new_value)

Examples:

# Increment view count
success, new_count = client.increment(
    key={"pk": "article#456", "sk": "metadata"},
    attribute="view_count"
)
print(f"New view count: {new_count}")

# Decrement (negative amount)
success, new_count = client.increment(
    key={"pk": "product#789", "sk": "inventory"},
    attribute="stock",
    amount=-1
)

# Increment by specific amount
success, new_count = client.increment(
    key={"pk": "user#123", "sk": "stats"},
    attribute="points",
    amount=100
)

append_to_list

Append values to a list attribute.

def append_to_list(
    self,
    key: Dict[str, Any],
    attribute: str,
    values: List[Any],
) -> Tuple[bool, Optional[List]]

Returns: Tuple[bool, Optional[List]] - (success, updated_list)

Example:

# Add tags to an item
success, tags = client.append_to_list(
    key={"pk": "article#456", "sk": "metadata"},
    attribute="tags",
    values=["python", "aws"]
)

# Add to activity log
success, log = client.append_to_list(
    key={"pk": "user#123", "sk": "activity"},
    attribute="events",
    values=[{"action": "login", "timestamp": "2024-01-15T10:30:00Z"}]
)

Delete Operations

delete

Delete an item by primary key.

def delete(
    self,
    key: Dict[str, Any],
    condition_expression: Optional[str] = None,
    return_old: bool = False,
) -> Tuple[bool, Optional[Dict[str, Any]]]
Parameter Type Default Description
key Dict Required Primary key dict
condition_expression str None Conditional delete
return_old bool False Return deleted item

Examples:

# Simple delete
success, _ = client.delete(
    key={"pk": "user#123", "sk": "profile"}
)

# Delete and return old item
success, old_item = client.delete(
    key={"pk": "user#123", "sk": "profile"},
    return_old=True
)

if success and old_item:
    print(f"Deleted user: {old_item['name']}")

# Conditional delete
success, _ = client.delete(
    key={"pk": "user#123", "sk": "profile"},
    condition_expression="attribute_exists(pk) AND status = :status",
)

batch_delete

Batch delete multiple items efficiently.

def batch_delete(
    self,
    keys: List[Dict[str, Any]],
    batch_size: int = 25,
) -> Tuple[int, int]

Returns: Tuple[int, int] - (successful_count, failed_count)

Example:

# Delete multiple items
keys_to_delete = [
    {"pk": "user#1", "sk": "profile"},
    {"pk": "user#2", "sk": "profile"},
    {"pk": "user#3", "sk": "profile"},
]

successful, failed = client.batch_delete(keys_to_delete)
print(f"Deleted {successful}, failed {failed}")

Utility Methods

exists

Check if an item exists.

def exists(
    self,
    partition_key: str,
    partition_value: Any,
    sort_key: Optional[str] = None,
    sort_value: Optional[Any] = None,
) -> bool

Example:

if client.exists(
    partition_key="pk",
    partition_value="user#123",
    sort_key="sk",
    sort_value="profile"
):
    print("User exists!")
else:
    print("User not found")

count

Count items matching criteria.

def count(
    self,
    partition_key: Optional[str] = None,
    partition_value: Optional[Any] = None,
    filter_expression: Optional[Any] = None,
) -> int

Examples:

# Count items for a partition key (efficient - uses query)
user_items = client.count(
    partition_key="pk",
    partition_value="user#123"
)

# Count all items in table (expensive - uses scan)
total_items = client.count()

# Count with filter
from boto3.dynamodb.conditions import Attr
active_count = client.count(
    partition_key="pk",
    partition_value="user#123",
    filter_expression=Attr("status").eq("active")
)

get_table_info

Get table metadata (cached after first call).

def get_table_info(self) -> Optional[Dict[str, Any]]

Returns:

{
    "name": str,                    # Table name
    "status": str,                  # e.g., "ACTIVE"
    "item_count": int,              # Approximate item count
    "key_schema": Dict[str, str],   # {attribute_name: key_type}
    "creation_date": str            # ISO timestamp
}

Example:

info = client.get_table_info()
print(f"Table: {info['name']}")
print(f"Status: {info['status']}")
print(f"Items: {info['item_count']}")
print(f"Key Schema: {info['key_schema']}")

Type Serialization

The client automatically handles type conversion between Python and DynamoDB:

Python Type DynamoDB Type Notes
str S (String) Direct mapping
int N (Number) Direct mapping
float N (Number) Converted to Decimal
bool BOOL Direct mapping
None NULL Direct mapping
list L (List) Recursively serialized
dict M (Map) Recursively serialized
Decimal N (Number) Converted back to int/float

Example:

# Python types are automatically converted
item = {
    "pk": "user#123",
    "name": "John",                    # str -> S
    "age": 30,                         # int -> N
    "balance": 99.99,                  # float -> N (Decimal)
    "active": True,                    # bool -> BOOL
    "tags": ["python", "aws"],         # list -> L
    "metadata": {"key": "value"},      # dict -> M
}

client.create(item=item)

# When reading, types are converted back
retrieved = client.get(partition_key="pk", partition_value="user#123")
print(type(retrieved["balance"]))  # float
print(type(retrieved["age"]))      # int

Error Handling

The client handles errors gracefully and returns consistent result types:

# Create operations return (bool, item_or_none)
success, item = client.create(item={...})
if not success:
    # Handle error (logged automatically)
    pass

# Get operations return item or None
item = client.get(partition_key="pk", partition_value="missing")
if item is None:
    print("Not found or error occurred")

# Query/Scan operations return empty results on error
result = client.query(partition_key="pk", partition_value="user#123")
if result["count"] == 0:
    print("No items found (or error occurred)")

Common Error Scenarios

Scenario Behavior
Item not found Returns None
Conditional check failed Returns (False, None)
Permission denied Returns error result, logs error
Network error Returns error result, logs error
Table not found Returns error result, logs error

Local Development

Use DynamoDB Local for development and testing:

# Run DynamoDB Local with Docker
docker run -d -p 8000:8000 amazon/dynamodb-local

# Create table
aws dynamodb create-table \
    --table-name my-table \
    --attribute-definitions AttributeName=pk,AttributeType=S AttributeName=sk,AttributeType=S \
    --key-schema AttributeName=pk,KeyType=HASH AttributeName=sk,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --endpoint-url http://localhost:8000
# Connect to local DynamoDB
client = DynamoDBClient(
    table_name="my-table",
    endpoint_url="http://localhost:8000"
)

Real-World Examples

Querying Agent Registry by Team Code

This example demonstrates how to query an agent registry table to retrieve all agents belonging to a specific team. The table name is loaded from an environment variable.

Table Structure:

The AKORDI_AGENT_TABLE stores agent configurations with:

  • team_code (partition key) - Team identifier (e.g., "AT-001")
  • id (sort key) - Agent identifier (e.g., "AG-001")

Example Code:

import os
from akordi_agents.config.dynamodb_client import DynamoDBClient

# Get table name from environment variable
table_name = os.getenv("AKORDI_AGENT_TABLE")

# Initialize the client
client = DynamoDBClient(table_name=table_name)

# Query all agents for a specific team
result = client.query(
    partition_key="team_code",
    partition_value="AT-001"
)

# Process the results
agents = result["items"]
print(f"Found {result['count']} agents for team AT-001")

for agent in agents:
    print(f"Agent: {agent['id']} - {agent['name']}")
    print(f"  Type: {agent['Type']}")
    print(f"  Active: {agent['active']}")
    print(f"  Lambda: {agent['lambda_arn']}")
    print(f"  Skills: {agent['capability']['skills']}")

Expected Response Structure:

The query returns items with the following structure (already deserialized from DynamoDB format):

{
    "items": [
        {
            "id": "AG-001",
            "active": True,
            "agent_prompt_code": "AP-001",
            "capability": {
                "confidence_score": 0.85,
                "domain": "documents",
                "estimated_latency": 5,
                "max_concurrent_tasks": 3,
                "skills": [
                    "document_analysis",
                    "pdf_analysis",
                    "risk_extraction",
                    "file_analysis"
                ]
            },
            "createdAt": "2026-01-28T01:04:32.908Z",
            "createdBy": "system",
            "lambda_arn": "arn:aws:lambda:ap-southeast-2:216989112759:function:image-akordi-agent-api",
            "maxTokens": 4000,
            "name": "document_analysis",
            "request_body": {
                "agent_code": {"from": "agent_prompt_code"},
                "bucket_name": {"default": "", "from": "bucket_name"},
                "chat-history": {"default": [], "from": "chat_history"},
                "file_keys": {"default": [], "from": "file_keys"},
                "kb_id": {"default": "", "from": "kb_id"},
                "query": {"from": "query"}
            },
            "request_type": "wrapped",
            "team_code": "AT-001",
            "Type": "Reasoning Agent",
            "version": "0.0.1"
        }
        # ... more agents
    ],
    "count": 1,
    "scanned_count": 1,
    "last_key": None  # None indicates no more pages
}

Filtering Active Agents Only:

from boto3.dynamodb.conditions import Attr

# Query only active agents for a team
result = client.query(
    partition_key="team_code",
    partition_value="AT-001",
    filter_expression=Attr("active").eq(True)
)

active_agents = result["items"]

Getting a Specific Agent:

# Get a specific agent by team_code and id
agent = client.get(
    partition_key="team_code",
    partition_value="AT-001",
    sort_key="id",
    sort_value="AG-001"
)

if agent:
    print(f"Found agent: {agent['name']}")
    print(f"Skills: {agent['capability']['skills']}")
else:
    print("Agent not found")

Querying with Projections (Fetch Only Needed Fields):

# Get only essential fields to reduce read costs
result = client.query(
    partition_key="team_code",
    partition_value="AT-001",
    projection=["id", "name", "active", "Type", "lambda_arn"]
)

for agent in result["items"]:
    print(f"{agent['id']}: {agent['name']} ({agent['Type']}) - Active: {agent['active']}")

Using a GSI to Query by Agent Type:

If the table has a Global Secondary Index (GSI) on Type:

# Query all "Reasoning Agent" types across teams
result = client.query(
    partition_key="Type",
    partition_value="Reasoning Agent",
    index_name="type-index"
)

Best Practices

1. Use Partition Key Queries Over Scans

# Good - uses partition key (efficient)
result = client.query(
    partition_key="pk",
    partition_value="user#123"
)

# Avoid - scans entire table (expensive)
result = client.scan(
    filter_expression=Attr("user_id").eq("user#123")
)

2. Use Projections to Reduce Read Costs

# Only fetch needed attributes
item = client.get(
    partition_key="pk",
    partition_value="user#123",
    projection=["name", "email"]
)

3. Use Batch Operations for Bulk Writes

# Efficient - single batch operation
client.batch_create(items)

# Inefficient - multiple individual calls
for item in items:
    client.create(item=item)

4. Use Pagination for Large Result Sets

# Good - process in pages
for page in client.query_paginated(
    partition_key="pk",
    partition_value="user#123",
    page_size=100
):
    process_page(page["items"])

# Avoid - loading all into memory
all_items = client.get_all()  # May cause memory issues

5. Use Conditional Writes for Concurrency

# Implement optimistic locking
success, _ = client.update(
    key={"pk": "user#123", "sk": "profile"},
    updates={"balance": new_balance, "version": current_version + 1},
    condition_expression="version = :v",
)

See Also