Skip to content

Make Aggregator process incoming prices into “latest aggregated” snapshots #33

@grantfox-oss

Description

@grantfox-oss

📝 Description

Context: apps/aggregator already has: normalization, aggregation strategies, metrics, health checks, and an in-memory debug store exposed at GET /debug/prices. It also has DataReceptionService which connects to the ingestor WebSocket and emits price.received events — but there’s currently no “glue” that listens to those events, normalizes the payload, runs aggregation, and keeps a continuously-updated latest snapshot per symbol.

Goal: Add a small “stream processor” inside the aggregator that:

  • listens for incoming price events
  • converts/validates them
  • normalizes them into the canonical format
  • maintains a short rolling buffer of recent normalized prices per symbol
  • calls AggregationService.aggregate(...) when enough sources are present
  • updates the in-memory latest aggregated snapshot (already handled via DebugService.setLastAggregated inside AggregationService)

This makes apps/aggregator actually produce real-time aggregated results that downstream services (API/Frontend) can consume.

Tech stack / conventions: NestJS (existing), TypeScript, @nestjs/event-emitter for @OnEvent(...) handler, reuse existing NormalizationService, AggregationService, and DebugService. Keep it in-memory (no Redis persistence in this issue).

Complexity: ~6/10 — event-driven wiring + buffering + tests; nothing on-chain.


✅ Requirements

  • Add an event listener for price.received (emitted by DataReceptionService) using @OnEvent('price.received').
  • Convert the event payload into a RawPrice-compatible object (notably: parse ISO timestamp into unix ms) and run it through NormalizationService.
  • Maintain an in-memory buffer per symbol (and optionally per source) for recent NormalizedPrice entries:
    • configurable time window (e.g. AGG_TIME_WINDOW_MS, default 30s)
    • configurable minSources (default 3, matching AggregationService)
  • When a symbol has enough recent sources, call AggregationService.aggregate(symbol, prices, options) and let it update DebugService’s lastAggregated and lastNormalized.
  • Fix/complete any missing Nest module wiring needed for this flow (e.g. ensure EventEmitterModule, HttpModule and providers referenced in AppModule are properly imported/registered).
  • Add tests proving: an incoming event updates the rolling buffer; aggregation runs when thresholds are met; debug store updates.

🎯 Acceptance Criteria

  • Aggregator reacts to live price.received events and continuously updates the latest aggregated snapshot for each symbol.
  • GET /debug/prices shows non-empty aggregated output after receiving valid events (in tests and/or local run).
  • Buffering behavior works: old entries are dropped outside the time window; aggregation requires minSources.
  • Tests cover the event handler and aggregation trigger logic (mocks acceptable for event emitter and services).
  • apps/aggregator builds and tests pass.

📁 Expected files to change/structure

  • apps/aggregator/src/services/price-stream-processor.service.ts (new) — @OnEvent handler + buffering + trigger aggregation.
  • apps/aggregator/src/app.module.ts — ensure all imports/providers are correctly wired for event emitter + reception + processor.
  • apps/aggregator/src/services/*.spec.ts (new or update) — unit tests for processor behavior.
  • apps/aggregator/.env.example (optional) — document AGG_TIME_WINDOW_MS, AGG_MIN_SOURCES, and ingestor connection vars if needed.

Note: don't worry about CI workflow for now, it's an issue on our end we will fix after we merge all issues!

Thank you for taking this issue! You are helping us make RWAs consumer friendly on Stellar.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions