Skip to content

feat(aggregator): add PriceStreamProcessorService for real-time price event processing#37

Open
salazarsebas wants to merge 1 commit intoNeko-Protocol:mainfrom
salazarsebas:feat/aggregator-stream-processor-33
Open

feat(aggregator): add PriceStreamProcessorService for real-time price event processing#37
salazarsebas wants to merge 1 commit intoNeko-Protocol:mainfrom
salazarsebas:feat/aggregator-stream-processor-33

Conversation

@salazarsebas
Copy link
Contributor

@salazarsebas salazarsebas commented Mar 2, 2026

Summary

  • Adds PriceStreamProcessorService that listens to price.received events from DataReceptionService and processes them through a full normalization → buffering → aggregation pipeline
  • Removes a duplicate ConfigModule.forRoot() call in AppModule
  • Adds AGG_TIME_WINDOW_MS and AGG_MIN_SOURCES configuration to .env.example

Motivation

DataReceptionService connects to the ingestor WebSocket and emits price.received events, but nothing was listening to them. This PR adds the missing glue that turns raw price events into continuously-updated aggregated snapshots available via GET /debug/prices.

Implementation details

The service runs a linear 6-step pipeline on each incoming event:

  1. DTO → RawPrice — converts ISO 8601 timestamp string to Unix ms
  2. Normalize — delegates to NormalizationService.normalize() with error handling
  3. Record → NormalizedPrice — maps originalTimestamptimestamp, NormalizedSource enum → string
  4. Buffer — per-symbol rolling buffer with per-source deduplication and time-window pruning
  5. Debug store — updates DebugService.setLastNormalized() so /debug/prices reflects current state
  6. Aggregate — triggers AggregationService.aggregate() when ≥ minSources distinct sources are present

Configuration

Variable Default Description
AGG_TIME_WINDOW_MS 30000 Rolling time window for buffered prices
AGG_MIN_SOURCES 3 Minimum distinct sources required to trigger aggregation

Files changed

File Change
apps/aggregator/src/services/price-stream-processor.service.ts New — core service
apps/aggregator/src/services/price-stream-processor.service.spec.ts New — 10 test scenarios
apps/aggregator/src/app.module.ts Added provider, removed duplicate ConfigModule.forRoot()
apps/aggregator/.env.example Added AGG_TIME_WINDOW_MS, AGG_MIN_SOURCES

Test plan

  • DTO → RawPrice: ISO timestamp correctly converted to Unix ms
  • NormalizedPriceRecord → NormalizedPrice: originalTimestamp maps to timestamp, source enum maps to string
  • Happy path: valid event → normalized → buffered → debug store updated
  • Normalization failure: error caught and logged, not rethrown
  • Buffer dedup: second price from same source replaces the first
  • Buffer pruning: old entries outside time window are removed
  • Aggregation NOT triggered: fewer than minSources distinct sources
  • Aggregation triggered: threshold met → aggregate() called with correct args
  • Aggregation failure: error from aggregate() caught and logged
  • Configuration: reads from ConfigService with correct defaults

Verification

✓ npm run lint        — clean
✓ npm run check-types — clean
✓ npm run build       — clean
✓ npm run test        — 17 suites, 256 tests passed

Closes #33

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make Aggregator process incoming prices into “latest aggregated” snapshots

1 participant