This POC demonstrates how to replace the GM batch processor with Temporal workflows, following the architecture discussed in the design session (Dec 18, 2025).
This implementation follows an approach of assuming an external process is checking every 5 seconds for the need to run a batch. That process will execute a Temporal Workflow when the batch is ready to run. Since new files can be placed at any time, this Workflow will fetch new VINs every 5 seconds until there are no more left to fetch.
┌─────────────────────────────────────────────────────────────┐
│ BFF (Existing) │
│ - Receives file uploads from UI │
│ - Stores VINs in database │
│ - Triggers Temporal workflow when batch is ready │
└─────────────────────────────────────────────────────────────┘
│
│ REST call / Event trigger
▼
┌─────────────────────────────────────────────────────────────┐
│ BatchProcessorWorkflow (Parent) │
│ - Triggered by BFF when batch is ready │
│ - Fetches up to 50 VINs every 5 seconds │
│ - Creates child workflow for each VIN │
│ - Continues until no more VINs available │
└─────────────────────────────────────────────────────────────┘
│
│ Spawns child workflows
▼
┌────────────────────────────────────────┐
│ VINProcessorWorkflow (Child) │
│ │
│ Step 1: Check consent │
│ Step 2: Publish to EventHub │
│ Step 3: Wait for domain response │
│ Step 4: Update BFF with status │
└────────────────────────────────────────┘
gmPoc/
├── shared.py # Data models and types
├── activities.py # Temporal activities (mocked)
├── workflows.py # Parent and child workflow definitions
├── worker.py # Temporal worker
├── starter.py # Example of how BFF triggers workflows
├── requirements.txt # Python dependencies
└── README.md # This file
- Python 3.8+
- Temporal Server (local development)
Using Temporal CLI (recommended):
# Install Temporal CLI
brew install temporal
# Start local Temporal server
temporal server start-devThe Temporal Web UI will be available at: http://localhost:8233
- Install Python dependencies:
uv syncOr install with all optional dependencies:
uv sync --extra all- Verify Temporal server is running:
# Temporal Web UI should be accessible at:
open http://localhost:8233The worker executes workflows and activities:
uv run python worker.pyYou should see:
INFO:__main__:Connected to Temporal server
INFO:__main__:Starting worker on task queue: gm-batch-processing-queue
INFO:__main__:Worker will execute BatchProcessorWorkflow and VINProcessorWorkflow
INFO:__main__:Press Ctrl+C to stop the worker
In a separate terminal, run the starter to simulate BFF triggering a batch:
uv run python starter.pyThis will:
- Start a batch processing workflow
- Show workflow status
- Demonstrate the signal-with-start pattern
- Open http://localhost:8233
- Click on "Workflows" in the left sidebar
- Find your workflow (e.g.,
batch-processor-batch-...) - Explore:
- Workflow execution history
- Child workflows (one per VIN)
- Activity executions
- Event timeline
- BFF triggers workflow when batch is ready (not continuous polling)
- Cost-optimized approach vs. checking every 5 seconds when idle
- Uses Temporal's
start_workflow()method
- Parent: Orchestrates batch processing, fetches VINs in chunks
- Child: Independent processing for each VIN
- Benefits: Scalability, independent retry, easy monitoring
Each VIN goes through:
- Consent Check - Verify user permission
- Publish to EventHub - Send command to appropriate domain
- Wait for Response - Long-running activity with heartbeats
- Update Status - Notify BFF of completion
- Parent workflow controls throughput via 5-second delay
- Currently: 50 VINs per 5 seconds = 600 VINs/minute
- Comments show how to adjust based on
command_type
- Retry policies for transient failures
- Heartbeat timeouts for stuck activities
- Graceful handling of no-consent scenarios
- Comments show where to add
RetryPolicy
Replace mock in activities.py:
@activity.defn
async def get_next_batch_from_bff(batch_id: str, batch_size: int) -> List[VINData]:
# Replace mock with actual REST call
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://your-bff-endpoint/api/batch/{batch_id}/next",
params={"size": batch_size}
) as response:
data = await response.json()
return [VINData(**item) for item in data]Replace mocks in activities.py:
@activity.defn
async def publish_to_eventhub(vin: str, command: str, command_type: CommandType) -> PublishResult:
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
producer = EventHubProducerClient.from_connection_string(
conn_str="YOUR_CONNECTION_STRING",
eventhub_name="your-eventhub"
)
async with producer:
event_data = EventData(json.dumps({
"vin": vin,
"command": command,
"timestamp": datetime.utcnow().isoformat()
}))
await producer.send_batch([event_data])
# Return actual message ID
return PublishResult(...)In workflows.py, replace the simple asyncio.sleep(5):
# Dynamic rate limiting based on command type
if all(v.command_type == CommandType.VEHICLE_COMMAND for v in vins):
delay = 5 # 600/min for vehicle commands
elif all(v.command_type == CommandType.BACKOFFICE_UPDATE for v in vins):
delay = 1 # 3000/min for back-office updates
else:
delay = 5 # Conservative default for mixed batches
await asyncio.sleep(delay)In workflows.py, add retry policies:
from temporalio.common import RetryPolicy
consent_result = await workflow.execute_activity(
check_consent,
args=[vin_data.vin],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3,
backoff_coefficient=2.0
)
)When you need to change workflow logic:
from temporalio import workflow
@workflow.defn
class VINProcessorWorkflow:
@workflow.run
async def run(self, vin_data: VINData) -> VINProcessingResult:
# Check version for backwards compatibility
version = workflow.patched("new-consent-check")
if version:
# New logic
consent_result = await workflow.execute_activity(
check_consent_v2, # New version
...
)
else:
# Old logic for in-flight workflows
consent_result = await workflow.execute_activity(
check_consent,
...
)In your BFF code (after file upload validation):
from temporalio.client import Client
async def handle_batch_upload(file_data, batch_id):
# 1. Validate and store file data in database
await store_batch_in_database(file_data, batch_id)
# 2. Trigger Temporal workflow
client = await Client.connect("your-temporal-endpoint:7233")
await client.start_workflow(
"BatchProcessorWorkflow",
args=[batch_id],
id=f"batch-processor-{batch_id}",
task_queue="gm-batch-processing-queue"
)
# 3. Return success to UI
return {"status": "processing", "batch_id": batch_id}- View all workflows: http://localhost:8233
- Search by batch_id, VIN, or workflow status
- Replay failed workflows
- View execution history and event logs
All workflows and activities log to console. Configure structured logging:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)Temporal provides built-in metrics for:
- Workflow execution time
- Activity success/failure rates
- Task queue depth
- Worker health
Integrate with Datadog, Prometheus, or other monitoring tools.
- Temporal Cloud: Managed service, high availability, global namespaces
- Self-Hosted: More control, but requires infrastructure management
- Run multiple workers for high availability
- Scale workers based on task queue depth
- Configure
max_concurrent_activitiesbased on downstream capacity
- Use mTLS for Temporal Cloud connections
- Encrypt sensitive data in workflow/activity inputs
- Use namespaces to isolate environments (dev/staging/prod)
- Event-driven triggers (vs continuous polling) ✓ Already implemented
- Adjust workflow/activity timeouts to match SLAs
- Use continue-as-new for very long-running workflows
- Implement comprehensive retry policies
- Set up alerts for workflow failures
- Use dead-letter queues for unrecoverable errors
# Check Temporal server is running
curl http://localhost:8233
# Check worker logs for connection errors
python worker.py# Verify worker is running on correct task queue
# Check starter.py uses same task queue: "gm-batch-processing-queue"- Increase
start_to_close_timeoutin workflow - Check mock delays in
activities.py - Verify Temporal server is responsive
- Review with team: Walk through code and architecture
- Integration planning: Identify which systems to integrate first
- Testing strategy: Unit tests, integration tests, load tests
- Deployment plan: Dev → Staging → Production rollout
- Monitoring setup: Logging, metrics, alerting
- Training: Team training on Temporal concepts