Skip to content

A high-performance, scalable consumer service that downloads Space Invaders game flash data and uploads images to IPFS (InterPlanetary File System) for decentralized storage. Built with TypeScript, Node.js, and optimized for production workloads.

License

Notifications You must be signed in to change notification settings

gitpancake/invader.consumer

Repository files navigation

Space Invaders Consumer

A high-performance, scalable consumer service that downloads Space Invaders game flash data and uploads images to IPFS (InterPlanetary File System) for decentralized storage. Built with TypeScript, Node.js, and optimized for production workloads.

πŸš€ Features

  • High Performance: 3-5x throughput improvement with concurrent processing
  • Memory Optimized: 30-50% memory usage reduction with smart GC management
  • Fault Tolerant: Circuit breakers, retry logic, and graceful error handling
  • Scalable: Configurable concurrency and batch processing
  • Production Ready: Comprehensive monitoring, health checks, and metrics
  • IPFS Integration: Seamless image storage on IPFS via Pinata
  • Proxy Support: Built-in proxy rotation for reliable image downloads

πŸ“‹ Prerequisites

  • Node.js >= 19.9.0
  • PostgreSQL database
  • RabbitMQ message broker
  • Pinata account for IPFS storage

πŸ› οΈ Installation

  1. Clone the repository:
git clone <repository-url>
cd invaders.consumer
  1. Install dependencies:
npm install
# or
yarn install
  1. Set up environment variables:
cp .env.example .env
# Edit .env with your configuration

βš™οΈ Configuration

Required Environment Variables

# RabbitMQ Configuration
RABBITMQ_URL=amqp://localhost:5672
RABBITMQ_QUEUE=flash_queue

# Database Configuration  
DATABASE_URL=postgresql://username:password@localhost:5432/database_name

# Pinata Configuration (IPFS)
PINATA_JWT=your_pinata_jwt_token

# Flash Identification (optional)
EMBEDDINGS_API_URL=https://your-embeddings-service.railway.app```

### Optional Performance Settings

```env
# Processing Configuration
CONSUMER_CONCURRENCY=5          # Concurrent processing threads
BATCH_SIZE=300                  # Database batch update size
CONSUMER_RATE_LIMIT=250         # API requests per minute
CONSUMER_TIMEOUT=30000          # Request timeout in ms

# Memory Management
GC_THRESHOLD=0.85              # Garbage collection threshold
MEMORY_WARNING_THRESHOLD=0.7    # Memory usage warning level

# Database Pool
DB_POOL_MIN=2                  # Minimum pool connections
DB_POOL_MAX=20                 # Maximum pool connections

# Monitoring
MONITORING_ENABLED=true         # Enable health checks and metrics
MONITORING_INTERVAL=60000       # Monitoring check interval (ms)

# Proxy Configuration (optional)
PROXY_LIST=http://proxy1:8080,http://proxy2:8080

Environment Presets

Development

CONSUMER_CONCURRENCY=3
BATCH_SIZE=100
CONSUMER_RATE_LIMIT=100

Production

CONSUMER_CONCURRENCY=5
BATCH_SIZE=300
CONSUMER_RATE_LIMIT=250
DB_POOL_MAX=20

High Volume

CONSUMER_CONCURRENCY=10
BATCH_SIZE=500
CONSUMER_RATE_LIMIT=500
DB_POOL_MAX=50

πŸ—„οΈ Database Setup

Run the provided migrations to set up the required database schema:

# Apply database migrations
psql $DATABASE_URL -f migrations/001_create_flashcastr_ipfs_flashes.sql
psql $DATABASE_URL -f migrations/002_add_ipfs_cid_to_flashes.sql

πŸš€ Usage

Development Mode

# Build and run with file watching
npm run dev

Production Mode

# Build the project
npm run build

# Start the consumer
npm start

Test Mode

# Run in test mode (limited processing)
npm test

πŸ“Š Monitoring & Scripts

Performance Analysis

npm run performance-check

Analyzes database performance, memory usage, and provides optimization recommendations.

Health Monitoring

npm run health-check  

Comprehensive system health verification and component status checking.

Metrics Export

npm run metrics

Exports performance metrics and generates trend analysis reports.

πŸ—οΈ Architecture

Core Components

  • Enhanced Consumer: Main processing engine with performance optimizations
  • Batch Updater: Efficient database operations with transaction support
  • Memory Optimizer: Smart memory management and garbage collection
  • Circuit Breakers: Fault tolerance for external service calls
  • Health Checker: System monitoring and alerting
  • Proxy Rotator: Reliable image downloading with failover

Data Flow

  1. Message Consumption: Receives Flash objects from RabbitMQ
  2. Image Download: Fetches images from Space Invaders API with proxy support
  3. IPFS Upload: Stores images on IPFS via Pinata gateway
  4. Flash Identification: Identifies flashes using CLIP embeddings (optional)
  5. Database Update: Records IPFS CIDs and identifications using batch operations
  6. Monitoring: Tracks performance metrics and system health

Performance Features

  • Concurrent Processing: Multi-threaded image processing
  • Smart Batching: Optimized database operations
  • Memory Streaming: Process large datasets efficiently
  • Circuit Breakers: Prevent cascading failures
  • Rate Limiting: Respect API quotas and limits

πŸ” Flash Identification

The consumer integrates with the invaders.embeddings service to automatically identify Space Invader flashes using CLIP embeddings and FAISS similarity search.

How It Works

  1. After uploading an image to IPFS, the consumer sends it to the embeddings API
  2. The embeddings service extracts the mosaic region using grid detection
  3. CLIP generates a 768-dimensional embedding vector
  4. FAISS searches for the most similar reference flash (~3,900 known flashes)
  5. If similarity >= 80%, the identification is stored in flash_identifications table

Configuration

Set EMBEDDINGS_API_URL to enable flash identification:

EMBEDDINGS_API_URL=https://invaders-embeddings.up.railway.app

If not set, flash identification is disabled and processing continues normally.

Database Schema

Identifications are stored in the flash_identifications table:

Column Type Description
source_ipfs_cid TEXT IPFS hash of the query image
matched_flash_id BIGINT Identified flash ID
matched_flash_name TEXT Flash name (e.g., PA_1234)
similarity FLOAT Cosine similarity (0.0-1.0)
confidence FLOAT Confidence score (0.0-1.0)

πŸ”§ Configuration Management

The system uses type-safe configuration with Zod validation:

import { configManager } from './src/util/config';

// Get processing configuration
const config = configManager.getProcessing();
console.log(`Concurrency: ${config.concurrency}`);

// Apply performance preset
configManager.applyPreset('highVolume');

πŸ“ˆ Performance Metrics

Expected performance improvements over the base implementation:

  • Processing Speed: 3-5x faster through concurrent processing
  • Memory Efficiency: 30-50% reduction in memory usage
  • Database Performance: 40-60% improvement in query performance
  • Uptime: 99%+ availability with circuit breakers
  • Error Recovery: Automatic retry and failover mechanisms

πŸ›‘οΈ Error Handling

Circuit Breaker Pattern

Automatic failure detection and recovery for:

  • Image download operations
  • IPFS upload operations
  • Database connections

Retry Logic

Exponential backoff retry for:

  • Network timeouts
  • Rate limiting responses
  • Transient failures

Graceful Degradation

  • Failed requests are automatically requeued
  • System continues operating during partial failures
  • Comprehensive error logging and alerting

πŸ” Monitoring

Real-time Metrics

  • Processing rate and throughput
  • Memory usage and optimization
  • Error rates and failure patterns
  • Circuit breaker states
  • Database performance

Health Checks

  • Component status verification
  • Resource availability monitoring
  • Performance threshold alerting
  • Automatic recovery detection

Logging

Structured logging with different levels:

  • INFO: Normal operations and milestones
  • WARN: Performance issues and recoverable errors
  • ERROR: Critical failures and system issues

🀝 Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

Development Setup

  1. Fork and clone the repository
  2. Install dependencies: npm install
  3. Set up environment: cp .env.example .env
  4. Run tests: npm test
  5. Start development: npm run dev

πŸ“œ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ†˜ Support

For support and questions:

πŸ“Š Prometheus Metrics

The consumer exposes Prometheus metrics on port 9091:

Endpoint Description
GET /metrics Prometheus metrics endpoint
GET / Also serves metrics (for flexibility)

Key metrics exposed:

  • invaders_consumer_flashes_processed_total - Total flashes processed
  • invaders_consumer_flashes_failed_total - Failed flash processing attempts
  • invaders_consumer_ipfs_uploads_total - Successful IPFS uploads
  • invaders_consumer_ipfs_failures_total - Failed IPFS uploads
  • invaders_consumer_processing_duration_seconds - Processing duration histogram
  • invaders_consumer_queue_depth - Current RabbitMQ queue depth
  • invaders_consumer_circuit_breaker_state - Circuit breaker status (0=closed, 1=open)
  • invaders_consumer_consecutive_failures - Consecutive failure count
  • invaders_consumer_last_processed_timestamp - Timestamp of last processed flash
  • invaders_consumer_uptime_seconds - Service uptime
  • invaders_consumer_memory_bytes - Memory usage by type (heap_used, heap_total, rss, external)

Note: This service runs on DigitalOcean and does not have distributed tracing enabled (unlike the Producer and API on Railway).

🎯 Roadmap

  • WebSocket real-time monitoring dashboard
  • Kubernetes deployment manifests
  • Advanced alerting integrations
  • Multi-region IPFS gateway support
  • Enhanced metrics exporters (Prometheus, Grafana)

Built with ❀️ for the Space Invaders community

About

A high-performance, scalable consumer service that downloads Space Invaders game flash data and uploads images to IPFS (InterPlanetary File System) for decentralized storage. Built with TypeScript, Node.js, and optimized for production workloads.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors 2

  •  
  •