Skip to content

cici/gmPOC

Repository files navigation

GM Batch Processing POC - Temporal Implementation

This POC demonstrates how to replace the GM batch processor with Temporal workflows, following the architecture discussed in the design session (Dec 18, 2025).

Architecture Overview

This implementation follows an approach of assuming an external process is checking every 5 seconds for the need to run a batch. That process will execute a Temporal Workflow when the batch is ready to run. Since new files can be placed at any time, this Workflow will fetch new VINs every 5 seconds until there are no more left to fetch.

┌─────────────────────────────────────────────────────────────┐
│                         BFF (Existing)                      │
│  - Receives file uploads from UI                           │
│  - Stores VINs in database                                 │
│  - Triggers Temporal workflow when batch is ready          │
└─────────────────────────────────────────────────────────────┘
                             │
                             │ REST call / Event trigger
                             ▼
┌─────────────────────────────────────────────────────────────┐
│              BatchProcessorWorkflow (Parent)                │
│  - Triggered by BFF when batch is ready                    │
│  - Fetches up to 50 VINs every 5 seconds                   │
│  - Creates child workflow for each VIN                     │
│  - Continues until no more VINs available                  │
└─────────────────────────────────────────────────────────────┘
                             │
                             │ Spawns child workflows
                             ▼
        ┌────────────────────────────────────────┐
        │   VINProcessorWorkflow (Child)         │
        │                                        │
        │  Step 1: Check consent                 │
        │  Step 2: Publish to EventHub           │
        │  Step 3: Wait for domain response      │
        │  Step 4: Update BFF with status        │
        └────────────────────────────────────────┘

Project Structure

gmPoc/
├── shared.py           # Data models and types
├── activities.py       # Temporal activities (mocked)
├── workflows.py        # Parent and child workflow definitions
├── worker.py           # Temporal worker
├── starter.py          # Example of how BFF triggers workflows
├── requirements.txt    # Python dependencies
└── README.md           # This file

Prerequisites

  1. Python 3.8+
  2. Temporal Server (local development)

Install Temporal Server

Using Temporal CLI (recommended):

# Install Temporal CLI
brew install temporal

# Start local Temporal server
temporal server start-dev

The Temporal Web UI will be available at: http://localhost:8233

Installation

  1. Install Python dependencies:
uv sync

Or install with all optional dependencies:

uv sync --extra all
  1. Verify Temporal server is running:
# Temporal Web UI should be accessible at:
open http://localhost:8233

Running the POC

Step 1: Start the Worker

The worker executes workflows and activities:

uv run python worker.py

You should see:

INFO:__main__:Connected to Temporal server
INFO:__main__:Starting worker on task queue: gm-batch-processing-queue
INFO:__main__:Worker will execute BatchProcessorWorkflow and VINProcessorWorkflow
INFO:__main__:Press Ctrl+C to stop the worker

Step 2: Trigger Batch Processing

In a separate terminal, run the starter to simulate BFF triggering a batch:

uv run python starter.py

This will:

  1. Start a batch processing workflow
  2. Show workflow status
  3. Demonstrate the signal-with-start pattern

Step 3: View in Temporal Web UI

  1. Open http://localhost:8233
  2. Click on "Workflows" in the left sidebar
  3. Find your workflow (e.g., batch-processor-batch-...)
  4. Explore:
    • Workflow execution history
    • Child workflows (one per VIN)
    • Activity executions
    • Event timeline

Key Features Demonstrated

1. Event-Driven Triggering

  • BFF triggers workflow when batch is ready (not continuous polling)
  • Cost-optimized approach vs. checking every 5 seconds when idle
  • Uses Temporal's start_workflow() method

2. Parent-Child Workflow Pattern

  • Parent: Orchestrates batch processing, fetches VINs in chunks
  • Child: Independent processing for each VIN
  • Benefits: Scalability, independent retry, easy monitoring

3. Three-Step VIN Processing Flow

Each VIN goes through:

  1. Consent Check - Verify user permission
  2. Publish to EventHub - Send command to appropriate domain
  3. Wait for Response - Long-running activity with heartbeats
  4. Update Status - Notify BFF of completion

4. Rate Limiting (Comments Show Implementation)

  • Parent workflow controls throughput via 5-second delay
  • Currently: 50 VINs per 5 seconds = 600 VINs/minute
  • Comments show how to adjust based on command_type

5. Error Handling (Comments Show Implementation)

  • Retry policies for transient failures
  • Heartbeat timeouts for stuck activities
  • Graceful handling of no-consent scenarios
  • Comments show where to add RetryPolicy

Extending the POC

1. Integrate with Real BFF

Replace mock in activities.py:

@activity.defn
async def get_next_batch_from_bff(batch_id: str, batch_size: int) -> List[VINData]:
    # Replace mock with actual REST call
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://your-bff-endpoint/api/batch/{batch_id}/next",
            params={"size": batch_size}
        ) as response:
            data = await response.json()
            return [VINData(**item) for item in data]

2. Integrate with EventHub

Replace mocks in activities.py:

@activity.defn
async def publish_to_eventhub(vin: str, command: str, command_type: CommandType) -> PublishResult:
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData

    producer = EventHubProducerClient.from_connection_string(
        conn_str="YOUR_CONNECTION_STRING",
        eventhub_name="your-eventhub"
    )

    async with producer:
        event_data = EventData(json.dumps({
            "vin": vin,
            "command": command,
            "timestamp": datetime.utcnow().isoformat()
        }))
        await producer.send_batch([event_data])

    # Return actual message ID
    return PublishResult(...)

3. Add Rate Limiting

In workflows.py, replace the simple asyncio.sleep(5):

# Dynamic rate limiting based on command type
if all(v.command_type == CommandType.VEHICLE_COMMAND for v in vins):
    delay = 5  # 600/min for vehicle commands
elif all(v.command_type == CommandType.BACKOFFICE_UPDATE for v in vins):
    delay = 1  # 3000/min for back-office updates
else:
    delay = 5  # Conservative default for mixed batches

await asyncio.sleep(delay)

4. Add Error Handling

In workflows.py, add retry policies:

from temporalio.common import RetryPolicy

consent_result = await workflow.execute_activity(
    check_consent,
    args=[vin_data.vin],
    start_to_close_timeout=timedelta(seconds=30),
    retry_policy=RetryPolicy(
        initial_interval=timedelta(seconds=1),
        maximum_interval=timedelta(seconds=10),
        maximum_attempts=3,
        backoff_coefficient=2.0
    )
)

5. Add Workflow Versioning

When you need to change workflow logic:

from temporalio import workflow

@workflow.defn
class VINProcessorWorkflow:
    @workflow.run
    async def run(self, vin_data: VINData) -> VINProcessingResult:
        # Check version for backwards compatibility
        version = workflow.patched("new-consent-check")

        if version:
            # New logic
            consent_result = await workflow.execute_activity(
                check_consent_v2,  # New version
                ...
            )
        else:
            # Old logic for in-flight workflows
            consent_result = await workflow.execute_activity(
                check_consent,
                ...
            )

6. Integrate BFF Trigger

In your BFF code (after file upload validation):

from temporalio.client import Client

async def handle_batch_upload(file_data, batch_id):
    # 1. Validate and store file data in database
    await store_batch_in_database(file_data, batch_id)

    # 2. Trigger Temporal workflow
    client = await Client.connect("your-temporal-endpoint:7233")

    await client.start_workflow(
        "BatchProcessorWorkflow",
        args=[batch_id],
        id=f"batch-processor-{batch_id}",
        task_queue="gm-batch-processing-queue"
    )

    # 3. Return success to UI
    return {"status": "processing", "batch_id": batch_id}

Monitoring and Observability

Temporal Web UI

  • View all workflows: http://localhost:8233
  • Search by batch_id, VIN, or workflow status
  • Replay failed workflows
  • View execution history and event logs

Logging

All workflows and activities log to console. Configure structured logging:

import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

Metrics

Temporal provides built-in metrics for:

  • Workflow execution time
  • Activity success/failure rates
  • Task queue depth
  • Worker health

Integrate with Datadog, Prometheus, or other monitoring tools.

Production Considerations

1. Temporal Cloud vs Self-Hosted

  • Temporal Cloud: Managed service, high availability, global namespaces
  • Self-Hosted: More control, but requires infrastructure management

2. Worker Scaling

  • Run multiple workers for high availability
  • Scale workers based on task queue depth
  • Configure max_concurrent_activities based on downstream capacity

3. Security

  • Use mTLS for Temporal Cloud connections
  • Encrypt sensitive data in workflow/activity inputs
  • Use namespaces to isolate environments (dev/staging/prod)

4. Cost Optimization

  • Event-driven triggers (vs continuous polling) ✓ Already implemented
  • Adjust workflow/activity timeouts to match SLAs
  • Use continue-as-new for very long-running workflows

5. Error Handling

  • Implement comprehensive retry policies
  • Set up alerts for workflow failures
  • Use dead-letter queues for unrecoverable errors

Troubleshooting

Worker not connecting to Temporal

# Check Temporal server is running
curl http://localhost:8233

# Check worker logs for connection errors
python worker.py

Workflows not starting

# Verify worker is running on correct task queue
# Check starter.py uses same task queue: "gm-batch-processing-queue"

Activities timing out

  • Increase start_to_close_timeout in workflow
  • Check mock delays in activities.py
  • Verify Temporal server is responsive

Next Steps

  1. Review with team: Walk through code and architecture
  2. Integration planning: Identify which systems to integrate first
  3. Testing strategy: Unit tests, integration tests, load tests
  4. Deployment plan: Dev → Staging → Production rollout
  5. Monitoring setup: Logging, metrics, alerting
  6. Training: Team training on Temporal concepts

Resources

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages