Claim Check Pattern with Temporal
Last updated Oct 10, 2025
The Claim Check pattern enables efficient handling of large payloads by storing them externally and passing only keys through Temporal workflows and activities.
This pattern is particularly useful when:
- Working with large files, images, or datasets
- Processing bulk data operations
- Handling payloads that exceed Temporal's size limits
- Improving performance by reducing payload serialization overhead
How the Claim Check Pattern Works
The Claim Check pattern implements a PayloadCodec that:
- Encode: Replaces large payloads with unique keys and stores the original data in external storage (Redis, S3, etc.)
- Decode: Retrieves the original payload using the key when needed
This allows Temporal workflows to operate with small, lightweight keys instead of large payloads, while maintaining transparent access to the full data through automatic encoding/decoding.
Claim Check Codec Implementation
The ClaimCheckCodec class implements the PayloadCodec interface to handle
the encoding and decoding of payloads.
import uuid
import redis.asyncio as redis
from typing import Iterable, List
from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec
class ClaimCheckCodec(PayloadCodec):
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Replace large payloads with keys and store original data in Redis."""
out: List[Payload] = []
for payload in payloads:
encoded = await self.encode_payload(payload)
out.append(encoded)
return out
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Retrieve original payloads from Redis using stored keys."""
out: List[Payload] = []
for payload in payloads:
if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1":
# Not a claim-checked payload, pass through unchanged
out.append(payload)
continue
redis_key = payload.data.decode("utf-8")
stored_data = await self.redis_client.get(redis_key)
if stored_data is None:
raise ValueError(f"Claim check key not found in Redis: {redis_key}")
original_payload = Payload.FromString(stored_data)
out.append(original_payload)
return out
async def encode_payload(self, payload: Payload) -> Payload:
"""Store payload in Redis and return a key-based payload."""
key = str(uuid.uuid4())
serialized_data = payload.SerializeToString()
# Store the original payload data in Redis
await self.redis_client.set(key, serialized_data)
# Return a lightweight payload containing only the key
return Payload(
metadata={
"encoding": b"claim-checked",
"temporal.io/claim-check-codec": b"v1",
},
data=key.encode("utf-8"),
)
Claim Check Plugin
The ClaimCheckPlugin integrates the codec with Temporal's client configuration.
import os
from temporalio.client import Plugin, ClientConfig
from temporalio.converter import DataConverter
from claim_check_codec import ClaimCheckCodec
class ClaimCheckPlugin(Plugin):
def __init__(self):
self.redis_host = os.getenv("REDIS_HOST", "localhost")
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))
def get_data_converter(self, config: ClientConfig) -> DataConverter:
"""Configure the data converter with claim check codec."""
default_converter_class = config["data_converter"].payload_converter_class
claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port)
return DataConverter(
payload_converter_class=default_converter_class,
payload_codec=claim_check_codec
)
def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Apply the claim check configuration to the client."""
config["data_converter"] = self.get_data_converter(config)
return super().configure_client(config)
Example: Two-Stage Large Data Processing Workflow
This example demonstrates the Claim Check pattern with a realistic data processing pipeline that shows how large payloads are handled at multiple stages.
Data Models
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class LargeDataset:
"""Represents a large dataset that would benefit from claim check pattern."""
data: List[Dict[str, Any]]
metadata: Dict[str, Any]
@dataclass
class TransformedDataset:
"""Large dataset after transformation - still large but processed."""
data: List[Dict[str, Any]]
metadata: Dict[str, Any]
transformation_stats: Dict[str, Any]
@dataclass
class SummaryResult:
"""Final summary result - small payload."""
total_items: int
processed_items: int
transformation_stats: Dict[str, Any]
summary_stats: Dict[str, Any]
errors: List[str]
Activities
from temporalio import activity
@activity.defn
async def transform_large_dataset(dataset: LargeDataset) -> TransformedDataset:
"""Transform a large dataset - produces another large dataset.
This activity demonstrates how the claim check pattern allows processing
of large datasets without hitting Temporal's payload size limits.
The transformation produces another large dataset that gets passed to
the next activity.
"""
processed_count = 0
errors = []
transformation_stats = {
"total_items": len(dataset.data),
"transformations_applied": []
}
# Create a copy of the data to transform
transformed_data = []
for item in dataset.data:
try:
# Create a new item with transformations
transformed_item = item.copy()
# Apply various transformations
if "value" in item:
transformed_item["processed_value"] = item["value"] * 2
transformed_item["value_category"] = "high" if item["value"] > 1000 else "low"
processed_count += 1
if "text" in item:
# Simulate text processing
words = item["text"].split()
transformed_item["word_count"] = len(words)
transformed_item["avg_word_length"] = sum(len(word) for word in words) / len(words) if words else 0
transformed_item["text_sentiment"] = "positive" if "good" in item["text"].lower() else "neutral"
processed_count += 1
# Add additional computed fields
transformed_item["computed_score"] = (
transformed_item.get("processed_value", 0) * 0.7 +
transformed_item.get("word_count", 0) * 0.3
)
transformed_data.append(transformed_item)
except Exception as e:
errors.append(f"Error transforming item {item.get('id', 'unknown')}: {str(e)}")
# Still add the original item even if transformation failed
transformed_data.append(item)
transformation_stats["processed_items"] = processed_count
transformation_stats["error_count"] = len(errors)
transformation_stats["transformations_applied"] = [
"value_doubling", "category_assignment", "text_analysis",
"sentiment_analysis", "score_computation"
]
# Update metadata
updated_metadata = dataset.metadata.copy()
updated_metadata["transformed_at"] = "2024-01-01T00:00:00Z"
updated_metadata["transformation_version"] = "1.0"
return TransformedDataset(
data=transformed_data,
metadata=updated_metadata,
transformation_stats=transformation_stats
)
@activity.defn
async def generate_summary(transformed_dataset: TransformedDataset) -> SummaryResult:
"""Generate a summary from the transformed dataset.
This activity takes the large transformed dataset and produces a small
summary result, demonstrating the claim check pattern with large activity
input and small output.
"""
data = transformed_dataset.data
total_items = len(data)
# Calculate summary statistics
summary_stats = {
"total_items": total_items,
"value_stats": {
"min_value": min(item.get("value", 0) for item in data),
"max_value": max(item.get("value", 0) for item in data),
"avg_value": sum(item.get("value", 0) for item in data) / total_items if total_items > 0 else 0,
"high_value_count": sum(1 for item in data if item.get("value_category") == "high")
},
"text_stats": {
"total_words": sum(item.get("word_count", 0) for item in data),
"avg_word_count": sum(item.get("word_count", 0) for item in data) / total_items if total_items > 0 else 0,
"avg_word_length": sum(item.get("avg_word_length", 0) for item in data) / total_items if total_items > 0 else 0,
"positive_sentiment_count": sum(1 for item in data if item.get("text_sentiment") == "positive")
},
"score_stats": {
"min_score": min(item.get("computed_score", 0) for item in data),
"max_score": max(item.get("computed_score", 0) for item in data),
"avg_score": sum(item.get("computed_score", 0) for item in data) / total_items if total_items > 0 else 0
}
}
return SummaryResult(
total_items=total_items,
processed_items=transformed_dataset.transformation_stats.get("processed_items", 0),
transformation_stats=transformed_dataset.transformation_stats,
summary_stats=summary_stats,
errors=transformed_dataset.transformation_stats.get("error_count", 0)
)
Workflow
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class LargeDataProcessingWorkflow:
"""Workflow that demonstrates the Claim Check pattern with large datasets.
This workflow demonstrates the claim check pattern by:
1. Taking a large dataset as input (large workflow input)
2. Transforming it into another large dataset (large activity input/output)
3. Generating a summary from the transformed data (large activity input, small output)
This shows how the claim check pattern handles large payloads at multiple stages.
"""
@workflow.run
async def run(self, dataset: LargeDataset) -> SummaryResult:
"""Process large dataset using claim check pattern with two-stage processing."""
# Step 1: Transform the large dataset (large input -> large output)
transformed_dataset = await workflow.execute_activity(
transform_large_dataset,
dataset,
start_to_close_timeout=timedelta(minutes=10),
summary="Transform large dataset"
)
# Step 2: Generate summary from transformed data (large input -> small output)
summary_result = await workflow.execute_activity(
generate_summary,
transformed_dataset,
start_to_close_timeout=timedelta(minutes=5),
summary="Generate summary from transformed data"
)
return summary_result
How This Demonstrates the Claim Check Pattern
This example shows the Claim Check pattern in action across multiple stages:
- Large Workflow Input: The workflow receives a large dataset from the client
- Large Activity Input/Output: The first activity transforms the large dataset, producing another large dataset
- Large Activity Input, Small Output: The second activity takes the transformed data and produces a compact summary
This flow demonstrates how the claim check pattern handles large payloads at multiple stages of processing, making it transparent to your workflow logic while avoiding Temporal's payload size limits.
Configuration
Set environment variables to configure the Redis connection:
# Configure Redis connection (optional - defaults to localhost:6379)
export REDIS_HOST=localhost
export REDIS_PORT=6379
Prerequisites
- Redis Server: Required for external storage of large payloads
Running the Example
- Start Redis server:
redis-server
- Start the Temporal Dev Server:
temporal server start-dev
- Run the worker:
uv run python -m worker
- Start execution:
uv run python -m start_workflow
Codec Server for Web UI
When using the Claim Check pattern, the Temporal Web UI will show encoded Redis keys instead of the actual payload data. This makes debugging and monitoring difficult since you can't see what data is being passed through your workflows.
The Problem
Without a codec server, the Web UI displays raw claim check keys like:
abc123-def4-5678-9abc-def012345678
This provides no context about what data is stored or how to access it, making workflow debugging and monitoring challenging.
Our Solution: Lightweight Codec Server
We've designed a codec server that provides helpful information without the risks of reading large payload data:
Design Principles
- No Data Reading: The codec server never reads the actual payload data during decode operations
- On-Demand Access: Full data is available via a separate endpoint when needed
- Simple & Safe: Just provides the Redis key and a link - no assumptions about data content
- Performance First: Zero impact on Web UI performance
What It Shows
Instead of raw keys, the Web UI displays:
"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678"
This gives you:
- Clear identification: You know this is claim check data
- Redis key: The actual key used for storage
- Direct access: Click the URL to view the full payload data
Running the Codec Server
- Start the codec server:
uv run python -m codec_server
-
Configure the Temporal Web UI to use the codec server. For
temporal server start-dev, see the Temporal documentation on configuring codec servers for the appropriate configuration method. -
Access the Temporal Web UI and you'll see helpful summaries instead of raw keys.
Why This Approach?
Avoiding Common Pitfalls
❌ What we DON'T do:
- Parse or analyze payload data (could be huge or malformed)
- Attempt to summarize content (assumes data structure)
- Read data during decode operations (performance impact)
✅ What we DO:
- Provide the Redis key for manual inspection
- Offer a direct link to view full data when needed
- Keep the Web UI responsive with minimal information
Benefits
- Performance: No Redis calls during Web UI operations
- Safety: No risk of parsing problematic data
- Flexibility: Works with any data type or size
- Debugging: Full data accessible when needed via
/view/{key}endpoint
Configuration Details
The codec server implements the Temporal codec server protocol with two endpoints:
/decode: Returns helpful text with Redis key and view URL/view/{key}: Serves the raw payload data for inspection
When you click the view URL, you'll see the complete payload data as stored in Redis, formatted appropriately for text or binary content.
Considerations
Performance Trade-offs
- Benefits: Reduces payload size, improves workflow performance, enables handling of large data
- Costs: Additional network calls to external storage, potential latency increase
Storage Backend Options
While this example uses Redis, production systems typically use:
- AWS S3: For AWS environments
- Google Cloud Storage: For GCP environments
- Azure Blob Storage: For Azure environments
- Redis: For development and testing
Activity Heartbeats
For production workloads processing very large datasets, consider implementing activity heartbeats to prevent timeout issues:
@activity.defn
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
total_items = len(dataset.data)
for i, item in enumerate(dataset.data):
# Send heartbeat every 100 items to prevent timeout
if i % 100 == 0:
await activity.heartbeat(f"Processed {i}/{total_items} items")
# Process item...
This ensures Temporal knows the activity is still making progress during long-running operations.
Error Handling
The codec includes error handling for:
- Missing keys in storage
- Storage connection failures
- Serialization/deserialization errors
Cleanup
Consider implementing cleanup strategies for stored data:
- TTL (Time To Live) for automatic expiration
- Manual cleanup workflows
- Lifecycle policies for cloud storage
Best Practices
- Enable globally: The claim check pattern applies to all payloads when enabled, so consider the performance impact across your entire system
- Monitor storage: Track storage usage and costs since all payloads will be stored externally
- Implement cleanup: Prevent storage bloat with appropriate cleanup strategies
- Test thoroughly: Verify the pattern works correctly with your specific data types and doesn't introduce unexpected latency
- Consider alternatives: Evaluate if data compression or other optimizations might be sufficient before implementing claim check