diff --git a/apps/aggregator/.env.example b/apps/aggregator/.env.example index 67e0b3c..e95658d 100644 --- a/apps/aggregator/.env.example +++ b/apps/aggregator/.env.example @@ -10,3 +10,7 @@ INGESTOR_HTTP_URL=http://localhost:3000 # Signer Service (for publishing aggregated data) SIGNER_URL=http://localhost:3002 + +# Aggregation Configuration +AGG_TIME_WINDOW_MS=30000 +AGG_MIN_SOURCES=3 diff --git a/apps/aggregator/src/app.module.ts b/apps/aggregator/src/app.module.ts index f527e73..a4b527a 100644 --- a/apps/aggregator/src/app.module.ts +++ b/apps/aggregator/src/app.module.ts @@ -11,6 +11,7 @@ import { DebugModule } from './debug/debug.module'; import { HttpModule } from '@nestjs/axios'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { DataReceptionService } from './services/data-reception.service'; +import { PriceStreamProcessorService } from './services/price-stream-processor.service'; @Module({ imports: [ @@ -19,15 +20,13 @@ import { DataReceptionService } from './services/data-reception.service'; HealthModule, MetricsModule, DebugModule, - ConfigModule.forRoot({ - isGlobal: true, - }), HttpModule, EventEmitterModule.forRoot(), ], controllers: [], providers: [ DataReceptionService, + PriceStreamProcessorService, AggregationService, WeightedAverageAggregator, MedianAggregator, diff --git a/apps/aggregator/src/services/price-stream-processor.service.spec.ts b/apps/aggregator/src/services/price-stream-processor.service.spec.ts new file mode 100644 index 0000000..6cbe6ec --- /dev/null +++ b/apps/aggregator/src/services/price-stream-processor.service.spec.ts @@ -0,0 +1,319 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { PriceStreamProcessorService } from './price-stream-processor.service'; +import { NormalizationService } from './normalization.service'; +import { AggregationService } from './aggregation.service'; +import { DebugService } from '../debug/debug.service'; +import { PriceInputDto } from '../dto/price-input.dto'; +import { + NormalizedPriceRecord, + NormalizedSource, +} from '../interfaces/normalized-price.interface'; + +describe('PriceStreamProcessorService', () => { + let service: PriceStreamProcessorService; + let normalizationService: jest.Mocked; + let aggregationService: jest.Mocked; + let debugService: jest.Mocked; + let configService: jest.Mocked; + + const NOW = 1700000000000; + + const makeDto = (overrides: Partial = {}): PriceInputDto => { + const dto = new PriceInputDto(); + dto.symbol = 'AAPL'; + dto.price = 150.0; + dto.source = 'alpha_vantage'; + dto.timestamp = '2024-01-15T14:30:00.000Z'; + Object.assign(dto, overrides); + return dto; + }; + + const makeNormalizedRecord = ( + overrides: Partial = {}, + ): NormalizedPriceRecord => ({ + symbol: 'AAPL', + price: 150.0, + timestamp: '2024-01-15T14:30:00.000Z', + originalTimestamp: NOW - 1000, + source: NormalizedSource.ALPHA_VANTAGE, + metadata: { + originalSource: 'alpha_vantage', + originalSymbol: 'AAPL', + normalizedAt: new Date().toISOString(), + normalizerVersion: '1.0.0', + wasTransformed: false, + transformations: [], + }, + ...overrides, + }); + + beforeEach(async () => { + jest.spyOn(Date, 'now').mockReturnValue(NOW); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PriceStreamProcessorService, + { + provide: NormalizationService, + useValue: { normalize: jest.fn() }, + }, + { + provide: AggregationService, + useValue: { aggregate: jest.fn() }, + }, + { + provide: DebugService, + useValue: { + setLastNormalized: jest.fn(), + setLastAggregated: jest.fn(), + }, + }, + { + provide: ConfigService, + useValue: { + get: jest.fn((key: string, defaultValue?: unknown) => { + const config: Record = { + AGG_TIME_WINDOW_MS: 30000, + AGG_MIN_SOURCES: 3, + }; + return config[key] ?? defaultValue; + }), + }, + }, + ], + }).compile(); + + service = module.get(PriceStreamProcessorService); + normalizationService = module.get(NormalizationService); + aggregationService = module.get(AggregationService); + debugService = module.get(DebugService); + configService = module.get(ConfigService); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('DTO → RawPrice conversion', () => { + it('should convert ISO timestamp string to Unix ms', () => { + const record = makeNormalizedRecord(); + normalizationService.normalize.mockReturnValue(record); + + const dto = makeDto({ timestamp: '2024-01-15T14:30:00.000Z' }); + service.handlePriceReceived(dto); + + expect(normalizationService.normalize).toHaveBeenCalledWith( + expect.objectContaining({ + symbol: 'AAPL', + price: 150.0, + source: 'alpha_vantage', + timestamp: new Date('2024-01-15T14:30:00.000Z').getTime(), + }), + ); + }); + }); + + describe('NormalizedPriceRecord → NormalizedPrice conversion', () => { + it('should map originalTimestamp to timestamp and source enum to string', () => { + const record = makeNormalizedRecord({ + originalTimestamp: NOW - 2000, + source: NormalizedSource.YAHOO_FINANCE, + }); + normalizationService.normalize.mockReturnValue(record); + + const dto = makeDto({ source: 'yahoo_finance' }); + service.handlePriceReceived(dto); + + expect(debugService.setLastNormalized).toHaveBeenCalledWith( + 'AAPL', + expect.arrayContaining([ + expect.objectContaining({ + timestamp: NOW - 2000, + source: 'yahoo_finance', + }), + ]), + ); + }); + }); + + describe('happy path', () => { + it('should normalize, buffer, and update debug store', () => { + const record = makeNormalizedRecord(); + normalizationService.normalize.mockReturnValue(record); + + service.handlePriceReceived(makeDto()); + + expect(normalizationService.normalize).toHaveBeenCalledTimes(1); + expect(debugService.setLastNormalized).toHaveBeenCalledWith( + 'AAPL', + expect.arrayContaining([ + expect.objectContaining({ + symbol: 'AAPL', + price: 150.0, + }), + ]), + ); + }); + }); + + describe('normalization failure', () => { + it('should catch error and not rethrow', () => { + normalizationService.normalize.mockImplementation(() => { + throw new Error('No normalizer found'); + }); + + expect(() => service.handlePriceReceived(makeDto())).not.toThrow(); + expect(debugService.setLastNormalized).not.toHaveBeenCalled(); + }); + }); + + describe('buffer dedup', () => { + it('should replace existing entry from the same source', () => { + const record1 = makeNormalizedRecord({ + price: 150.0, + originalTimestamp: NOW - 2000, + }); + const record2 = makeNormalizedRecord({ + price: 151.0, + originalTimestamp: NOW - 1000, + }); + + normalizationService.normalize + .mockReturnValueOnce(record1) + .mockReturnValueOnce(record2); + + service.handlePriceReceived(makeDto({ price: 150.0 })); + service.handlePriceReceived(makeDto({ price: 151.0 })); + + // The second call should replace the first — buffer has 1 entry + const lastCall = + debugService.setLastNormalized.mock.calls[ + debugService.setLastNormalized.mock.calls.length - 1 + ]; + const buffer = lastCall[1]; + + expect(buffer).toHaveLength(1); + expect(buffer[0].price).toBe(151.0); + }); + }); + + describe('buffer pruning', () => { + it('should remove entries outside the time window', () => { + // First price: old timestamp (outside 30s window) + const oldRecord = makeNormalizedRecord({ + originalTimestamp: NOW - 60000, + source: NormalizedSource.ALPHA_VANTAGE, + }); + // Second price: recent timestamp from different source + const recentRecord = makeNormalizedRecord({ + originalTimestamp: NOW - 1000, + source: NormalizedSource.FINNHUB, + }); + + normalizationService.normalize + .mockReturnValueOnce(oldRecord) + .mockReturnValueOnce(recentRecord); + + service.handlePriceReceived(makeDto({ source: 'alpha_vantage' })); + service.handlePriceReceived(makeDto({ source: 'finnhub' })); + + const lastCall = + debugService.setLastNormalized.mock.calls[ + debugService.setLastNormalized.mock.calls.length - 1 + ]; + const buffer = lastCall[1]; + + // Old entry should be pruned; only the recent one remains + expect(buffer).toHaveLength(1); + expect(buffer[0].source).toBe('finnhub'); + }); + }); + + describe('aggregation NOT triggered', () => { + it('should not call aggregate when fewer than minSources distinct sources', () => { + const record = makeNormalizedRecord(); + normalizationService.normalize.mockReturnValue(record); + + // Only 1 source — minSources is 3 + service.handlePriceReceived(makeDto()); + + expect(aggregationService.aggregate).not.toHaveBeenCalled(); + }); + }); + + describe('aggregation triggered', () => { + it('should call aggregate when minSources threshold is met', () => { + const sources = [ + NormalizedSource.ALPHA_VANTAGE, + NormalizedSource.FINNHUB, + NormalizedSource.YAHOO_FINANCE, + ]; + + sources.forEach((source, i) => { + normalizationService.normalize.mockReturnValueOnce( + makeNormalizedRecord({ + originalTimestamp: NOW - (i + 1) * 1000, + source, + }), + ); + service.handlePriceReceived(makeDto({ source: source as string })); + }); + + expect(aggregationService.aggregate).toHaveBeenCalledTimes(1); + expect(aggregationService.aggregate).toHaveBeenCalledWith( + 'AAPL', + expect.arrayContaining([ + expect.objectContaining({ source: 'alpha_vantage' }), + expect.objectContaining({ source: 'finnhub' }), + expect.objectContaining({ source: 'yahoo_finance' }), + ]), + { minSources: 3, timeWindowMs: 30000 }, + ); + }); + }); + + describe('aggregation failure', () => { + it('should catch aggregate errors and not rethrow', () => { + const sources = [ + NormalizedSource.ALPHA_VANTAGE, + NormalizedSource.FINNHUB, + NormalizedSource.YAHOO_FINANCE, + ]; + + aggregationService.aggregate.mockImplementation(() => { + throw new Error('Insufficient recent sources'); + }); + + sources.forEach((source, i) => { + normalizationService.normalize.mockReturnValueOnce( + makeNormalizedRecord({ + originalTimestamp: NOW - (i + 1) * 1000, + source, + }), + ); + }); + + // Should not throw + expect(() => { + sources.forEach((source) => { + service.handlePriceReceived(makeDto({ source: source as string })); + }); + }).not.toThrow(); + }); + }); + + describe('configuration', () => { + it('should read AGG_TIME_WINDOW_MS and AGG_MIN_SOURCES from ConfigService with defaults', () => { + expect(configService.get).toHaveBeenCalledWith( + 'AGG_TIME_WINDOW_MS', + 30000, + ); + expect(configService.get).toHaveBeenCalledWith('AGG_MIN_SOURCES', 3); + }); + }); +}); diff --git a/apps/aggregator/src/services/price-stream-processor.service.ts b/apps/aggregator/src/services/price-stream-processor.service.ts new file mode 100644 index 0000000..2d4cdd4 --- /dev/null +++ b/apps/aggregator/src/services/price-stream-processor.service.ts @@ -0,0 +1,122 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { ConfigService } from '@nestjs/config'; +import { RawPrice } from '@oracle-stocks/shared'; +import { PriceInputDto } from '../dto/price-input.dto'; +import { + NormalizedPrice, + NormalizedPriceRecord, +} from '../interfaces/normalized-price.interface'; +import { NormalizationService } from './normalization.service'; +import { AggregationService } from './aggregation.service'; +import { DebugService } from '../debug/debug.service'; + +@Injectable() +export class PriceStreamProcessorService { + private readonly logger = new Logger(PriceStreamProcessorService.name); + private readonly priceBuffer = new Map(); + private readonly timeWindowMs: number; + private readonly minSources: number; + + constructor( + private readonly normalizationService: NormalizationService, + private readonly aggregationService: AggregationService, + private readonly debugService: DebugService, + private readonly configService: ConfigService, + ) { + this.timeWindowMs = this.configService.get( + 'AGG_TIME_WINDOW_MS', + 30000, + ); + this.minSources = this.configService.get('AGG_MIN_SOURCES', 3); + } + + @OnEvent('price.received') + handlePriceReceived(dto: PriceInputDto): void { + // Step 1: Convert DTO → RawPrice + const rawPrice = this.toRawPrice(dto); + + // Step 2: Normalize + let record: NormalizedPriceRecord; + try { + record = this.normalizationService.normalize(rawPrice); + } catch (error) { + this.logger.warn( + `Normalization failed for ${dto.symbol} from ${dto.source}: ${(error as Error).message}`, + ); + return; + } + + // Step 3: Convert NormalizedPriceRecord → NormalizedPrice + const normalizedPrice = this.toNormalizedPrice(record); + + // Step 4: Buffer, dedup, and prune + this.addToBuffer(normalizedPrice); + + // Step 5: Update debug store with current buffer + const buffer = this.priceBuffer.get(normalizedPrice.symbol) ?? []; + this.debugService.setLastNormalized(normalizedPrice.symbol, [...buffer]); + + // Step 6: Try aggregation + this.tryAggregate(normalizedPrice.symbol); + } + + private toRawPrice(dto: PriceInputDto): RawPrice { + return { + symbol: dto.symbol, + price: dto.price, + source: dto.source, + timestamp: new Date(dto.timestamp).getTime(), + }; + } + + private toNormalizedPrice(record: NormalizedPriceRecord): NormalizedPrice { + return { + symbol: record.symbol, + price: record.price, + timestamp: record.originalTimestamp, + source: record.source as string, + }; + } + + private addToBuffer(price: NormalizedPrice): void { + let buffer = this.priceBuffer.get(price.symbol); + if (!buffer) { + buffer = []; + this.priceBuffer.set(price.symbol, buffer); + } + + // Dedup: replace any existing entry from the same source + const existingIndex = buffer.findIndex((p) => p.source === price.source); + if (existingIndex !== -1) { + buffer[existingIndex] = price; + } else { + buffer.push(price); + } + + // Prune: remove entries outside the time window + const cutoff = Date.now() - this.timeWindowMs; + const pruned = buffer.filter((p) => p.timestamp >= cutoff); + this.priceBuffer.set(price.symbol, pruned); + } + + private tryAggregate(symbol: string): void { + const buffer = this.priceBuffer.get(symbol) ?? []; + const distinctSources = new Set(buffer.map((p) => p.source)).size; + + if (distinctSources < this.minSources) { + return; + } + + try { + this.aggregationService.aggregate(symbol, [...buffer], { + minSources: this.minSources, + timeWindowMs: this.timeWindowMs, + }); + } catch (error) { + this.logger.warn( + `Aggregation failed for ${symbol}: ${(error as Error).message}`, + ); + } + } +} diff --git a/package-lock.json b/package-lock.json index 6073363..f2d8aff 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1891,19 +1891,6 @@ "@tybys/wasm-util": "^0.10.0" } }, - "node_modules/@nestjs/axios": { - "version": "3.1.3", - "resolved": "https://registry.npmjs.org/@nestjs/axios/-/axios-3.1.3.tgz", - "integrity": "sha512-RZ/63c1tMxGLqyG3iOCVt7A72oy4x1eM6QEhd4KzCYpaVWW0igq0WSREeRoEZhIxRcZfDfIIkvsOMiM7yfVGZQ==", - "license": "MIT", - "optional": true, - "peer": true, - "peerDependencies": { - "@nestjs/common": "^7.0.0 || ^8.0.0 || ^9.0.0 || ^10.0.0", - "axios": "^1.3.1", - "rxjs": "^6.0.0 || ^7.0.0" - } - }, "node_modules/@nestjs/cli": { "version": "10.4.9", "resolved": "https://registry.npmjs.org/@nestjs/cli/-/cli-10.4.9.tgz",