A high-performance, scalable consumer service that downloads Space Invaders game flash data and uploads images to IPFS (InterPlanetary File System) for decentralized storage. Built with TypeScript, Node.js, and optimized for production workloads.
- High Performance: 3-5x throughput improvement with concurrent processing
- Memory Optimized: 30-50% memory usage reduction with smart GC management
- Fault Tolerant: Circuit breakers, retry logic, and graceful error handling
- Scalable: Configurable concurrency and batch processing
- Production Ready: Comprehensive monitoring, health checks, and metrics
- IPFS Integration: Seamless image storage on IPFS via Pinata
- Proxy Support: Built-in proxy rotation for reliable image downloads
- Node.js >= 19.9.0
- PostgreSQL database
- RabbitMQ message broker
- Pinata account for IPFS storage
- Clone the repository:
git clone <repository-url>
cd invaders.consumer- Install dependencies:
npm install
# or
yarn install- Set up environment variables:
cp .env.example .env
# Edit .env with your configuration# RabbitMQ Configuration
RABBITMQ_URL=amqp://localhost:5672
RABBITMQ_QUEUE=flash_queue
# Database Configuration
DATABASE_URL=postgresql://username:password@localhost:5432/database_name
# Pinata Configuration (IPFS)
PINATA_JWT=your_pinata_jwt_token
# Flash Identification (optional)
EMBEDDINGS_API_URL=https://your-embeddings-service.railway.app```
### Optional Performance Settings
```env
# Processing Configuration
CONSUMER_CONCURRENCY=5 # Concurrent processing threads
BATCH_SIZE=300 # Database batch update size
CONSUMER_RATE_LIMIT=250 # API requests per minute
CONSUMER_TIMEOUT=30000 # Request timeout in ms
# Memory Management
GC_THRESHOLD=0.85 # Garbage collection threshold
MEMORY_WARNING_THRESHOLD=0.7 # Memory usage warning level
# Database Pool
DB_POOL_MIN=2 # Minimum pool connections
DB_POOL_MAX=20 # Maximum pool connections
# Monitoring
MONITORING_ENABLED=true # Enable health checks and metrics
MONITORING_INTERVAL=60000 # Monitoring check interval (ms)
# Proxy Configuration (optional)
PROXY_LIST=http://proxy1:8080,http://proxy2:8080CONSUMER_CONCURRENCY=3
BATCH_SIZE=100
CONSUMER_RATE_LIMIT=100CONSUMER_CONCURRENCY=5
BATCH_SIZE=300
CONSUMER_RATE_LIMIT=250
DB_POOL_MAX=20CONSUMER_CONCURRENCY=10
BATCH_SIZE=500
CONSUMER_RATE_LIMIT=500
DB_POOL_MAX=50Run the provided migrations to set up the required database schema:
# Apply database migrations
psql $DATABASE_URL -f migrations/001_create_flashcastr_ipfs_flashes.sql
psql $DATABASE_URL -f migrations/002_add_ipfs_cid_to_flashes.sql# Build and run with file watching
npm run dev# Build the project
npm run build
# Start the consumer
npm start# Run in test mode (limited processing)
npm testnpm run performance-checkAnalyzes database performance, memory usage, and provides optimization recommendations.
npm run health-check Comprehensive system health verification and component status checking.
npm run metricsExports performance metrics and generates trend analysis reports.
- Enhanced Consumer: Main processing engine with performance optimizations
- Batch Updater: Efficient database operations with transaction support
- Memory Optimizer: Smart memory management and garbage collection
- Circuit Breakers: Fault tolerance for external service calls
- Health Checker: System monitoring and alerting
- Proxy Rotator: Reliable image downloading with failover
- Message Consumption: Receives Flash objects from RabbitMQ
- Image Download: Fetches images from Space Invaders API with proxy support
- IPFS Upload: Stores images on IPFS via Pinata gateway
- Flash Identification: Identifies flashes using CLIP embeddings (optional)
- Database Update: Records IPFS CIDs and identifications using batch operations
- Monitoring: Tracks performance metrics and system health
- Concurrent Processing: Multi-threaded image processing
- Smart Batching: Optimized database operations
- Memory Streaming: Process large datasets efficiently
- Circuit Breakers: Prevent cascading failures
- Rate Limiting: Respect API quotas and limits
The consumer integrates with the invaders.embeddings service to automatically identify Space Invader flashes using CLIP embeddings and FAISS similarity search.
- After uploading an image to IPFS, the consumer sends it to the embeddings API
- The embeddings service extracts the mosaic region using grid detection
- CLIP generates a 768-dimensional embedding vector
- FAISS searches for the most similar reference flash (~3,900 known flashes)
- If similarity >= 80%, the identification is stored in
flash_identificationstable
Set EMBEDDINGS_API_URL to enable flash identification:
EMBEDDINGS_API_URL=https://invaders-embeddings.up.railway.appIf not set, flash identification is disabled and processing continues normally.
Identifications are stored in the flash_identifications table:
| Column | Type | Description |
|---|---|---|
| source_ipfs_cid | TEXT | IPFS hash of the query image |
| matched_flash_id | BIGINT | Identified flash ID |
| matched_flash_name | TEXT | Flash name (e.g., PA_1234) |
| similarity | FLOAT | Cosine similarity (0.0-1.0) |
| confidence | FLOAT | Confidence score (0.0-1.0) |
The system uses type-safe configuration with Zod validation:
import { configManager } from './src/util/config';
// Get processing configuration
const config = configManager.getProcessing();
console.log(`Concurrency: ${config.concurrency}`);
// Apply performance preset
configManager.applyPreset('highVolume');Expected performance improvements over the base implementation:
- Processing Speed: 3-5x faster through concurrent processing
- Memory Efficiency: 30-50% reduction in memory usage
- Database Performance: 40-60% improvement in query performance
- Uptime: 99%+ availability with circuit breakers
- Error Recovery: Automatic retry and failover mechanisms
Automatic failure detection and recovery for:
- Image download operations
- IPFS upload operations
- Database connections
Exponential backoff retry for:
- Network timeouts
- Rate limiting responses
- Transient failures
- Failed requests are automatically requeued
- System continues operating during partial failures
- Comprehensive error logging and alerting
- Processing rate and throughput
- Memory usage and optimization
- Error rates and failure patterns
- Circuit breaker states
- Database performance
- Component status verification
- Resource availability monitoring
- Performance threshold alerting
- Automatic recovery detection
Structured logging with different levels:
- INFO: Normal operations and milestones
- WARN: Performance issues and recoverable errors
- ERROR: Critical failures and system issues
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
- Fork and clone the repository
- Install dependencies:
npm install - Set up environment:
cp .env.example .env - Run tests:
npm test - Start development:
npm run dev
This project is licensed under the MIT License - see the LICENSE file for details.
For support and questions:
- Open an issue on GitHub
- Check the Performance Improvements documentation
- Review configuration examples above
The consumer exposes Prometheus metrics on port 9091:
| Endpoint | Description |
|---|---|
GET /metrics |
Prometheus metrics endpoint |
GET / |
Also serves metrics (for flexibility) |
Key metrics exposed:
invaders_consumer_flashes_processed_total- Total flashes processedinvaders_consumer_flashes_failed_total- Failed flash processing attemptsinvaders_consumer_ipfs_uploads_total- Successful IPFS uploadsinvaders_consumer_ipfs_failures_total- Failed IPFS uploadsinvaders_consumer_processing_duration_seconds- Processing duration histograminvaders_consumer_queue_depth- Current RabbitMQ queue depthinvaders_consumer_circuit_breaker_state- Circuit breaker status (0=closed, 1=open)invaders_consumer_consecutive_failures- Consecutive failure countinvaders_consumer_last_processed_timestamp- Timestamp of last processed flashinvaders_consumer_uptime_seconds- Service uptimeinvaders_consumer_memory_bytes- Memory usage by type (heap_used, heap_total, rss, external)
Note: This service runs on DigitalOcean and does not have distributed tracing enabled (unlike the Producer and API on Railway).
- WebSocket real-time monitoring dashboard
- Kubernetes deployment manifests
- Advanced alerting integrations
- Multi-region IPFS gateway support
- Enhanced metrics exporters (Prometheus, Grafana)
Built with β€οΈ for the Space Invaders community