-
Notifications
You must be signed in to change notification settings - Fork 12
Description
📝 Description
Context: apps/aggregator already has: normalization, aggregation strategies, metrics, health checks, and an in-memory debug store exposed at GET /debug/prices. It also has DataReceptionService which connects to the ingestor WebSocket and emits price.received events — but there’s currently no “glue” that listens to those events, normalizes the payload, runs aggregation, and keeps a continuously-updated latest snapshot per symbol.
Goal: Add a small “stream processor” inside the aggregator that:
- listens for incoming price events
- converts/validates them
- normalizes them into the canonical format
- maintains a short rolling buffer of recent normalized prices per symbol
- calls
AggregationService.aggregate(...)when enough sources are present - updates the in-memory latest aggregated snapshot (already handled via
DebugService.setLastAggregatedinsideAggregationService)
This makes apps/aggregator actually produce real-time aggregated results that downstream services (API/Frontend) can consume.
Tech stack / conventions: NestJS (existing), TypeScript, @nestjs/event-emitter for @OnEvent(...) handler, reuse existing NormalizationService, AggregationService, and DebugService. Keep it in-memory (no Redis persistence in this issue).
Complexity: ~6/10 — event-driven wiring + buffering + tests; nothing on-chain.
✅ Requirements
- Add an event listener for
price.received(emitted byDataReceptionService) using@OnEvent('price.received'). - Convert the event payload into a
RawPrice-compatible object (notably: parse ISOtimestampinto unix ms) and run it throughNormalizationService. - Maintain an in-memory buffer per symbol (and optionally per source) for recent
NormalizedPriceentries:- configurable time window (e.g.
AGG_TIME_WINDOW_MS, default 30s) - configurable
minSources(default 3, matchingAggregationService)
- configurable time window (e.g.
- When a symbol has enough recent sources, call
AggregationService.aggregate(symbol, prices, options)and let it updateDebugService’slastAggregatedandlastNormalized. - Fix/complete any missing Nest module wiring needed for this flow (e.g. ensure
EventEmitterModule,HttpModuleand providers referenced inAppModuleare properly imported/registered). - Add tests proving: an incoming event updates the rolling buffer; aggregation runs when thresholds are met; debug store updates.
🎯 Acceptance Criteria
- Aggregator reacts to live
price.receivedevents and continuously updates the latest aggregated snapshot for each symbol. -
GET /debug/pricesshows non-emptyaggregatedoutput after receiving valid events (in tests and/or local run). - Buffering behavior works: old entries are dropped outside the time window; aggregation requires
minSources. - Tests cover the event handler and aggregation trigger logic (mocks acceptable for event emitter and services).
-
apps/aggregatorbuilds and tests pass.
📁 Expected files to change/structure
apps/aggregator/src/services/price-stream-processor.service.ts(new) —@OnEventhandler + buffering + trigger aggregation.apps/aggregator/src/app.module.ts— ensure all imports/providers are correctly wired for event emitter + reception + processor.apps/aggregator/src/services/*.spec.ts(new or update) — unit tests for processor behavior.apps/aggregator/.env.example(optional) — documentAGG_TIME_WINDOW_MS,AGG_MIN_SOURCES, and ingestor connection vars if needed.
Note: don't worry about CI workflow for now, it's an issue on our end we will fix after we merge all issues!
Thank you for taking this issue! You are helping us make RWAs consumer friendly on Stellar.