diff --git a/README.md b/README.md index 67005db..7875b8c 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ npm install dxtrade-api - [x] Position metrics (per-position P&L) - [x] Account metrics, trade journal & trade history - [x] Symbol search & instrument info -- [x] OHLC / price bar data +- [x] OHLC / price bar data (one-shot & streaming) - [x] PnL assessments - [x] Multi-broker support (FTMO, Eightcap, Lark Funding) - [x] Persistent WebSocket with `connect()` @@ -34,7 +34,7 @@ npm install dxtrade-api - [x] Full TypeScript support - [ ] Batch orders - [ ] Modify existing orders -- [ ] Real-time price streaming +- [x] Real-time OHLC streaming ## Quick Start @@ -48,8 +48,8 @@ const client = new DxtradeClient({ accountId: "optional_account_id", }); -// connect() = auth + persistent WebSocket (recommended) -await client.connect(); +// await client.auth(); // Auth only +await client.connect(); // Auth + persistent WebSocket (recommended) const suggestions = await client.symbols.search("EURUSD"); const symbol = suggestions[0]; @@ -63,17 +63,17 @@ const order = await client.orders.submit({ }); console.log(`Order ${order.orderId}: ${order.status}`); -client.disconnect(); +client.disconnect(); // disconnect stream -- only needed if client.connect() ``` ## Connection Modes ```ts -// Persistent WebSocket (recommended) — reuses one WS for all data, enables streaming +// 1. Persistent WebSocket (recommended) — reuses one WS for all data, enables streaming await client.connect(); client.disconnect(); // when done -// Lightweight — auth only, each data call opens a temporary WebSocket +// 2. Lightweight — auth only, each data call opens a temporary WebSocket await client.auth(); ``` @@ -113,11 +113,10 @@ BROKER.FTMO // "https://dxtrade.ftmo.com" ### Positions -- `client.positions.get()` — Get all open positions +- `client.positions.get()` — Get all open positions with P&L metrics merged (margin, plOpen, marketValue, etc.) - `client.positions.close(params)` — Close a position (supports partial closes via the quantity field) - `client.positions.closeAll()` — Close all open positions with market orders -- `client.positions.metrics()` — Get position-level P&L metrics -- `client.positions.stream(callback)` — Stream real-time position updates (requires `connect()`). Returns an unsubscribe function. +- `client.positions.stream(callback)` — Stream real-time position updates with live P&L (requires `connect()`). Returns an unsubscribe function. ### Orders @@ -145,6 +144,7 @@ BROKER.FTMO // "https://dxtrade.ftmo.com" ### OHLC - `client.ohlc.get(params)` — Fetch OHLC price bars for a symbol (resolution, range, maxBars, priceField) +- `client.ohlc.stream(params, callback)` — Stream real-time OHLC bar updates (requires `connect()`). Returns a promise that resolves with an unsubscribe function after the snapshot is received. ### Assessments @@ -196,6 +196,8 @@ npm run example:symbols:info:btc npm run example:instruments:get npm run example:instruments:get:forex npm run example:ohlc:get +npm run example:ohlc:stream +npm run example:ohlc:stream:btc npm run example:assessments:get npm run example:assessments:get:btc ``` diff --git a/examples/ohlc.stream.ts b/examples/ohlc.stream.ts new file mode 100644 index 0000000..4b237e8 --- /dev/null +++ b/examples/ohlc.stream.ts @@ -0,0 +1,28 @@ +import "dotenv/config"; +import { DxtradeClient, BROKER } from "../src"; + +const client = new DxtradeClient({ + username: process.env.DXTRADE_USERNAME!, + password: process.env.DXTRADE_PASSWORD!, + broker: process.env.DXTRADE_BROKER! || BROKER.FTMO, + accountId: process.env.DXTRADE_ACCOUNT_ID, + debug: process.env.DXTRADE_DEBUG || false, +}); + +const symbol = process.argv[2] ?? "EURUSD"; + +(async () => { + await client.connect(); + console.log(`Streaming OHLC for ${symbol}...\n`); + + const unsubscribe = await client.ohlc.stream({ symbol }, (bars) => { + console.log(`${bars.length} bars:`, bars.slice(-3)); + }); + + // Stream for 60 seconds then clean up + setTimeout(() => { + console.log("Unsubscribing and disconnecting..."); + unsubscribe(); + client.disconnect(); + }, 60_000); +})().catch(console.error); diff --git a/examples/positions.metrics.ts b/examples/positions.metrics.ts deleted file mode 100644 index d28acf1..0000000 --- a/examples/positions.metrics.ts +++ /dev/null @@ -1,17 +0,0 @@ -import "dotenv/config"; -import { DxtradeClient, BROKER } from "../src"; - -const client = new DxtradeClient({ - username: process.env.DXTRADE_USERNAME!, - password: process.env.DXTRADE_PASSWORD!, - broker: process.env.DXTRADE_BROKER! || BROKER.FTMO, - accountId: process.env.DXTRADE_ACCOUNT_ID, - debug: process.env.DXTRADE_DEBUG || false, -}); - -(async () => { - await client.auth(); - const metrics = await client.positions.metrics(); - - console.log("Position metrics:", metrics); -})().catch(console.error); diff --git a/llms.txt b/llms.txt index 98f5567..e7c243f 100644 --- a/llms.txt +++ b/llms.txt @@ -32,11 +32,10 @@ await client.connect(); // auth + persistent WebSocket (recommended) - client.switchAccount(accountId: string) — Switch to a specific trading account ### Positions -- client.positions.get() — Get all open positions, returns Position.Get[] +- client.positions.get() — Get all open positions with P&L metrics merged (margin, plOpen, marketValue, etc.), returns Position.Full[] - client.positions.close(params: Position.Close) — Close a position (supports partial closes via the quantity field) - client.positions.closeAll() — Close all open positions with market orders -- client.positions.metrics() — Get position-level P&L metrics, returns Position.Metrics[] -- client.positions.stream(callback: (positions: Position.Get[]) => void) — Stream real-time position updates, requires connect(). Returns unsubscribe function. +- client.positions.stream(callback: (positions: Position.Full[]) => void) — Stream real-time position updates with live P&L (requires connect()). Returns unsubscribe function. ### Orders - client.orders.get() — Get all pending/open orders, returns Order.Get[] @@ -63,6 +62,7 @@ await client.connect(); // auth + persistent WebSocket (recommended) - client.ohlc.get(params: OHLC.Params) — Fetch OHLC price bars for a symbol, returns OHLC.Bar[] Required params: symbol (string) Optional params: resolution (seconds, default 300), range (seconds, default 345600), maxBars (default 3500), priceField ("bid" | "ask", default "bid") +- client.ohlc.stream(params: OHLC.Params, callback: (bars: OHLC.Bar[]) => void) — Stream real-time OHLC bar updates (requires connect()). Returns Promise<() => void>. Callback receives snapshot bars first, then live updates. ### Assessments - client.assessments.get(params: Assessments.Params) — Fetch PnL assessments for a date range diff --git a/package.json b/package.json index 5106049..547a555 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,6 @@ "example:positions:get": "tsx examples/positions.get.ts", "example:positions:close": "tsx examples/positions.close.ts", "example:positions:close-all": "tsx examples/positions.close-all.ts", - "example:positions:metrics": "tsx examples/positions.metrics.ts", "example:positions:stream": "tsx examples/positions.stream.ts", "example:orders:submit": "tsx examples/orders.submit.ts", "example:account:metrics": "tsx examples/account.metrics.ts", @@ -39,6 +38,8 @@ "example:instruments:get": "tsx examples/instruments.get.ts", "example:instruments:get:forex": "tsx examples/instruments.get.ts FOREX", "example:ohlc:get": "tsx examples/ohlc.get.ts", + "example:ohlc:stream": "tsx examples/ohlc.stream.ts", + "example:ohlc:stream:btc": "tsx examples/ohlc.stream.ts BTCUSD", "example:assessments:get": "tsx examples/assessments.get.ts", "example:assessments:get:btc": "tsx examples/assessments.get.ts BTCUSD", "============= Git =============": "", diff --git a/src/client.ts b/src/client.ts index fa4a674..8ba4c1e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -11,7 +11,6 @@ import { getAccountMetrics, getTradeHistory, getPositions, - getPositionMetrics, closePosition, closeAllPositions, streamPositions, @@ -20,6 +19,7 @@ import { getSymbolLimits, getSymbolSuggestions, getOHLC, + streamOHLC, getSymbolInfo, getOrders, cancelOrder, @@ -31,8 +31,8 @@ import { class PositionsDomain { constructor(private _ctx: ClientContext) {} - /** Get all open positions via WebSocket. */ - get(): Promise { + /** Get all open positions with P&L metrics merged. */ + get(): Promise { return getPositions(this._ctx); } @@ -46,13 +46,8 @@ class PositionsDomain { return closeAllPositions(this._ctx); } - /** Get position-level P&L metrics via WebSocket. */ - metrics(): Promise { - return getPositionMetrics(this._ctx); - } - - /** Stream real-time position updates. Requires connect(). Returns unsubscribe function. */ - stream(callback: (positions: Position.Get[]) => void): () => void { + /** Stream real-time position updates with P&L metrics. Requires connect(). Returns unsubscribe function. */ + stream(callback: (positions: Position.Full[]) => void): () => void { return streamPositions(this._ctx, callback); } } @@ -153,6 +148,11 @@ class OhlcDomain { get(params: OHLC.Params): Promise { return getOHLC(this._ctx, params); } + + /** Stream real-time OHLC bar updates. Requires connect(). Returns unsubscribe function. */ + stream(params: OHLC.Params, callback: (bars: OHLC.Bar[]) => void): Promise<() => void> { + return streamOHLC(this._ctx, params, callback); + } } class AssessmentsDomain { @@ -193,7 +193,7 @@ export class DxtradeClient { public readonly symbols: SymbolsDomain; /** Instrument operations: get (with optional filtering). */ public readonly instruments: InstrumentsDomain; - /** OHLC price bar operations: get. */ + /** OHLC price bar operations: get, stream. */ public readonly ohlc: OhlcDomain; /** PnL assessment operations: get. */ public readonly assessments: AssessmentsDomain; diff --git a/src/constants/enums.ts b/src/constants/enums.ts index dff4cf0..eb5f05b 100644 --- a/src/constants/enums.ts +++ b/src/constants/enums.ts @@ -92,5 +92,6 @@ export enum WS_MESSAGE { export namespace WS_MESSAGE { export enum SUBTOPIC { BIG_CHART_COMPONENT = "BigChartComponentPresenter-4", + OHLC_STREAM = "OHLCStreamPresenter-0", } } diff --git a/src/domains/ohlc/ohlc.ts b/src/domains/ohlc/ohlc.ts index f3b088f..1d731b3 100644 --- a/src/domains/ohlc/ohlc.ts +++ b/src/domains/ohlc/ohlc.ts @@ -4,6 +4,109 @@ import { Cookies, authHeaders, retryRequest, parseWsData, shouldLog, debugLog } import type { ClientContext } from "@/client.types"; import type { OHLC } from "."; +export async function streamOHLC( + ctx: ClientContext, + params: OHLC.Params, + callback: (bars: OHLC.Bar[]) => void, +): Promise<() => void> { + if (!ctx.wsManager) { + ctx.throwError( + ERROR.STREAM_REQUIRES_CONNECT, + "Streaming requires a persistent WebSocket. Use connect() instead of auth().", + ); + } + + const { symbol, resolution = 60, range = 432_000, maxBars = 3500, priceField = "bid" } = params; + const subtopic = WS_MESSAGE.SUBTOPIC.OHLC_STREAM; + const headers = authHeaders(ctx.csrf!, Cookies.serialize(ctx.cookies)); + const snapshotBars: OHLC.Bar[] = []; + let snapshotDone = false; + let resolveSnapshot: (() => void) | null = null; + + const onChartFeed = (body: Record) => { + if (body?.subtopic !== subtopic) return; + const data = body.data as OHLC.Bar[] | undefined; + if (!Array.isArray(data)) return; + + if (!snapshotDone) { + snapshotBars.push(...data); + if (body.snapshotEnd) { + snapshotDone = true; + callback([...snapshotBars]); + resolveSnapshot?.(); + } + } else { + callback(data); + } + }; + + ctx.wsManager.on(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed); + + try { + await retryRequest( + { + method: "PUT", + url: endpoints.subscribeInstruments(ctx.broker), + data: { instruments: [symbol] }, + headers, + }, + ctx.retries, + ); + await retryRequest( + { + method: "PUT", + url: endpoints.charts(ctx.broker), + data: { + chartIds: [], + requests: [ + { + aggregationPeriodSeconds: resolution, + extendedSession: true, + forexPriceField: priceField, + id: 0, + maxBarsCount: maxBars, + range, + studySubscription: [], + subtopic, + symbol, + }, + ], + }, + headers, + }, + ctx.retries, + ); + } catch (error: unknown) { + ctx.wsManager.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed); + const message = error instanceof Error ? error.message : "Unknown error"; + ctx.throwError(ERROR.OHLC_ERROR, `OHLC stream subscription error: ${message}`); + } + + await new Promise((resolve, reject) => { + if (snapshotDone) return resolve(); + + const timer = setTimeout(() => { + if (snapshotBars.length > 0) { + snapshotDone = true; + callback([...snapshotBars]); + resolve(); + } else { + ctx.wsManager?.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed); + reject(new DxtradeError(ERROR.OHLC_TIMEOUT, "OHLC stream snapshot timed out")); + } + }, 30_000); + + resolveSnapshot = () => { + clearTimeout(timer); + resolve(); + }; + }); + + return () => { + ctx.wsManager?.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed); + }; +} + export async function getOHLC(ctx: ClientContext, params: OHLC.Params, timeout = 30_000): Promise { ctx.ensureSession(); diff --git a/src/domains/position/position.ts b/src/domains/position/position.ts index cd4f2eb..c1343f7 100644 --- a/src/domains/position/position.ts +++ b/src/domains/position/position.ts @@ -4,7 +4,25 @@ import { Cookies, parseWsData, shouldLog, debugLog, retryRequest, authHeaders } import type { ClientContext } from "@/client.types"; import type { Position } from "."; -export function streamPositions(ctx: ClientContext, callback: (positions: Position.Get[]) => void): () => void { +function mergePositionsWithMetrics(positions: Position.Get[], metrics: Position.Metrics[]): Position.Full[] { + const metricsMap = new Map(metrics.map((m) => [m.uid, m])); + return positions.map((pos) => { + const m = metricsMap.get(pos.uid); + return { + ...pos, + margin: m?.margin ?? 0, + plOpen: m?.plOpen ?? 0, + plClosed: m?.plClosed ?? 0, + totalCommissions: m?.totalCommissions ?? 0, + totalFinancing: m?.totalFinancing ?? 0, + plRate: m?.plRate ?? 0, + averagePrice: m?.averagePrice ?? 0, + marketValue: m?.marketValue ?? 0, + }; + }); +} + +export function streamPositions(ctx: ClientContext, callback: (positions: Position.Full[]) => void): () => void { if (!ctx.wsManager) { ctx.throwError( ERROR.STREAM_REQUIRES_CONNECT, @@ -12,24 +30,37 @@ export function streamPositions(ctx: ClientContext, callback: (positions: Positi ); } - const listener = (body: Position.Get[]) => callback(body); - ctx.wsManager.on(WS_MESSAGE.POSITIONS, listener); + const emit = () => { + const positions = ctx.wsManager!.getCached(WS_MESSAGE.POSITIONS); + const metrics = ctx.wsManager!.getCached(WS_MESSAGE.POSITION_METRICS); + if (positions && metrics) { + callback(mergePositionsWithMetrics(positions, metrics)); + } + }; - const cached = ctx.wsManager.getCached(WS_MESSAGE.POSITIONS); - if (cached !== undefined) { - callback(cached); - } + const onPositions = () => emit(); + const onMetrics = () => emit(); + + ctx.wsManager.on(WS_MESSAGE.POSITIONS, onPositions); + ctx.wsManager.on(WS_MESSAGE.POSITION_METRICS, onMetrics); + + emit(); return () => { - ctx.wsManager?.removeListener(WS_MESSAGE.POSITIONS, listener); + ctx.wsManager?.removeListener(WS_MESSAGE.POSITIONS, onPositions); + ctx.wsManager?.removeListener(WS_MESSAGE.POSITION_METRICS, onMetrics); }; } -export async function getPositions(ctx: ClientContext): Promise { +export async function getPositions(ctx: ClientContext): Promise { ctx.ensureSession(); if (ctx.wsManager) { - return ctx.wsManager.waitFor(WS_MESSAGE.POSITIONS); + const [positions, metrics] = await Promise.all([ + ctx.wsManager.waitFor(WS_MESSAGE.POSITIONS), + ctx.wsManager.waitFor(WS_MESSAGE.POSITION_METRICS), + ]); + return mergePositionsWithMetrics(positions, metrics); } const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); @@ -37,6 +68,8 @@ export async function getPositions(ctx: ClientContext): Promise return new Promise((resolve, reject) => { const ws = new WebSocket(wsUrl, { headers: { Cookie: cookieStr } }); + let positions: Position.Get[] | null = null; + let metrics: Position.Metrics[] | null = null; const timer = setTimeout(() => { ws.close(); @@ -49,54 +82,22 @@ export async function getPositions(ctx: ClientContext): Promise if (typeof msg === "string") return; if (msg.type === WS_MESSAGE.POSITIONS) { - clearTimeout(timer); - ws.close(); - resolve(msg.body as Position.Get[]); + positions = msg.body as Position.Get[]; } - }); - - ws.on("error", (error) => { - clearTimeout(timer); - ws.close(); - reject(new DxtradeError(ERROR.ACCOUNT_POSITIONS_ERROR, `Account positions error: ${error.message}`)); - }); - }); -} - -export async function getPositionMetrics(ctx: ClientContext, timeout = 30_000): Promise { - ctx.ensureSession(); - - if (ctx.wsManager) { - return ctx.wsManager.waitFor(WS_MESSAGE.POSITION_METRICS, timeout); - } - - const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); - const cookieStr = Cookies.serialize(ctx.cookies); - - return new Promise((resolve, reject) => { - const ws = new WebSocket(wsUrl, { headers: { Cookie: cookieStr } }); - - const timer = setTimeout(() => { - ws.close(); - reject(new DxtradeError(ERROR.POSITION_METRICS_TIMEOUT, "Position metrics timed out")); - }, timeout); - - ws.on("message", (data) => { - const msg = parseWsData(data); - if (shouldLog(msg, ctx.debug)) debugLog(msg); - - if (typeof msg === "string") return; if (msg.type === WS_MESSAGE.POSITION_METRICS) { + metrics = msg.body as Position.Metrics[]; + } + if (positions && metrics) { clearTimeout(timer); ws.close(); - resolve(msg.body as Position.Metrics[]); + resolve(mergePositionsWithMetrics(positions, metrics)); } }); ws.on("error", (error) => { clearTimeout(timer); ws.close(); - reject(new DxtradeError(ERROR.POSITION_METRICS_ERROR, `Position metrics error: ${error.message}`)); + reject(new DxtradeError(ERROR.ACCOUNT_POSITIONS_ERROR, `Account positions error: ${error.message}`)); }); }); } diff --git a/src/domains/position/position.types.ts b/src/domains/position/position.types.ts index f657b66..0bb0916 100644 --- a/src/domains/position/position.types.ts +++ b/src/domains/position/position.types.ts @@ -16,14 +16,30 @@ export namespace Position { } export interface Metrics { - positionCode: string; - openPl: number; - openPlPerLot: number; - currentPrice: number; - convertedOpenPl: number; + uid: string; + accountId: string; + margin: number; + plOpen: number; + plClosed: number; + totalCommissions: number; + totalFinancing: number; + plRate: number; + averagePrice: number; + marketValue: number; [key: string]: unknown; } + export interface Full extends Get { + margin: number; + plOpen: number; + plClosed: number; + totalCommissions: number; + totalFinancing: number; + plRate: number; + averagePrice: number; + marketValue: number; + } + export interface Close { legs: { instrumentId: number; diff --git a/tests/positions.test.ts b/tests/positions.test.ts index 9204794..dcea4d0 100644 --- a/tests/positions.test.ts +++ b/tests/positions.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; import { EventEmitter } from "events"; import { DxtradeError } from "@/constants/errors"; import { WS_MESSAGE } from "@/constants/enums"; -import { getPositionMetrics, closeAllPositions } from "@/domains/position"; +import { getPositions, closeAllPositions } from "@/domains/position"; import { createMockContext } from "./helpers"; // --- Mocks --- @@ -32,65 +32,108 @@ beforeEach(() => { vi.clearAllMocks(); }); +// --- Helpers --- + +const mockPositions = [ + { + uid: "u1", + accountId: "ACC-123", + positionKey: { instrumentId: 3438, positionCode: "POS-1" }, + quantity: 1000, + cost: 1000, + costBasis: 1000, + openCost: 1000, + marginRate: 0.01, + time: 0, + modifiedTime: 0, + userLogin: "test", + takeProfit: null, + stopLoss: null, + }, +]; + +const mockMetrics = [ + { + uid: "u1", + accountId: "ACC-123", + margin: 10, + plOpen: 5.5, + plClosed: 0, + totalCommissions: -0.03, + totalFinancing: -0.1, + plRate: 1.105, + averagePrice: 1.1, + marketValue: 1100, + }, +]; + +function emitBothMessages() { + const posPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); + wsInstance.emit("message", Buffer.from(`${posPayload.length}|${posPayload}`)); + + const metPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: mockMetrics }); + wsInstance.emit("message", Buffer.from(`${metPayload.length}|${metPayload}`)); +} + // --- Tests --- -describe("getPositionMetrics", () => { - it("should return metrics from WebSocket POSITION_METRICS message", async () => { +describe("getPositions", () => { + it("should return merged positions with metrics", async () => { const ctx = createMockContext(); - const mockMetrics = [ - { positionCode: "POS-1", openPl: 150.5, openPlPerLot: 15.05, currentPrice: 1.1234, convertedOpenPl: 150.5 }, - { positionCode: "POS-2", openPl: -42.0, openPlPerLot: -4.2, currentPrice: 65000.0, convertedOpenPl: -42.0 }, - ]; - - const promise = getPositionMetrics(ctx); - const payload = JSON.stringify({ accountId: "ACC-123", type: WS_MESSAGE.POSITION_METRICS, body: mockMetrics }); - wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + const promise = getPositions(ctx); + emitBothMessages(); const result = await promise; - expect(result).toEqual(mockMetrics); + expect(result).toHaveLength(1); + expect(result[0].uid).toBe("u1"); + expect(result[0].quantity).toBe(1000); + expect(result[0].plOpen).toBe(5.5); + expect(result[0].margin).toBe(10); + expect(result[0].positionKey.instrumentId).toBe(3438); expect(wsInstance.close).toHaveBeenCalled(); }); - it("should ignore non-matching WS message types", async () => { + it("should wait for both POSITIONS and POSITION_METRICS before resolving", async () => { const ctx = createMockContext(); - const mockMetrics = [ - { positionCode: "POS-1", openPl: 100, openPlPerLot: 10, currentPrice: 1.1, convertedOpenPl: 100 }, - ]; - const promise = getPositionMetrics(ctx); + const promise = getPositions(ctx); + + // Send only POSITIONS first — should NOT resolve yet + const posPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); + wsInstance.emit("message", Buffer.from(`${posPayload.length}|${posPayload}`)); - // Send an unrelated message first - const otherPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.ACCOUNT_METRICS, body: {} }); - wsInstance.emit("message", Buffer.from(`${otherPayload.length}|${otherPayload}`)); + // Verify not resolved by checking close wasn't called + expect(wsInstance.close).not.toHaveBeenCalled(); - // Then send the real one - const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: mockMetrics }); - wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + // Now send POSITION_METRICS + const metPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: mockMetrics }); + wsInstance.emit("message", Buffer.from(`${metPayload.length}|${metPayload}`)); const result = await promise; - expect(result).toEqual(mockMetrics); + expect(result).toHaveLength(1); + expect(wsInstance.close).toHaveBeenCalled(); }); it("should reject on WS error", async () => { const ctx = createMockContext(); - const promise = getPositionMetrics(ctx); + const promise = getPositions(ctx); wsInstance.emit("error", new Error("ws failed")); await expect(promise).rejects.toThrow(DxtradeError); - await expect(promise).rejects.toThrow("Position metrics error: ws failed"); + await expect(promise).rejects.toThrow("Account positions error: ws failed"); }); it("should reject on timeout", async () => { vi.useFakeTimers(); const ctx = createMockContext(); - const promise = getPositionMetrics(ctx, 2000); - vi.advanceTimersByTime(2001); + const promise = getPositions(ctx); + vi.advanceTimersByTime(30_001); await expect(promise).rejects.toThrow(DxtradeError); - await expect(promise).rejects.toThrow("Position metrics timed out"); + await expect(promise).rejects.toThrow("Account positions timed out"); vi.useRealTimers(); }); @@ -98,7 +141,7 @@ describe("getPositionMetrics", () => { it("should throw NO_SESSION when not authenticated", async () => { const ctx = createMockContext({ csrf: null }); - await expect(getPositionMetrics(ctx)).rejects.toThrow("No active session"); + await expect(getPositions(ctx)).rejects.toThrow("No active session"); }); }); @@ -106,22 +149,8 @@ describe("closeAllPositions", () => { it("should close each position with a market order", async () => { const ctx = createMockContext(); - const mockPositions = [ - { - uid: "u1", - accountId: "ACC-123", - positionKey: { instrumentId: 3438, positionCode: "POS-1" }, - quantity: 1000, - cost: 1000, - costBasis: 1000, - openCost: 1000, - marginRate: 0.01, - time: 0, - modifiedTime: 0, - userLogin: "test", - takeProfit: null, - stopLoss: null, - }, + const twoPositions = [ + ...mockPositions, { uid: "u2", accountId: "ACC-123", @@ -139,10 +168,18 @@ describe("closeAllPositions", () => { }, ]; + const twoMetrics = [ + ...mockMetrics, + { uid: "u2", accountId: "ACC-123", margin: 5, plOpen: -2, plClosed: 0, totalCommissions: 0, totalFinancing: 0, plRate: 0, averagePrice: 0, marketValue: 0 }, + ]; + // getPositions uses WS, closePosition uses retryRequest setTimeout(() => { - const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); - wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + const posPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: twoPositions }); + wsInstance.emit("message", Buffer.from(`${posPayload.length}|${posPayload}`)); + + const metPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: twoMetrics }); + wsInstance.emit("message", Buffer.from(`${metPayload.length}|${metPayload}`)); }, 200); mockRetryRequest.mockResolvedValue({ status: 200 }); @@ -171,8 +208,11 @@ describe("closeAllPositions", () => { const ctx = createMockContext(); setTimeout(() => { - const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: [] }); - wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + const posPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: [] }); + wsInstance.emit("message", Buffer.from(`${posPayload.length}|${posPayload}`)); + + const metPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: [] }); + wsInstance.emit("message", Buffer.from(`${metPayload.length}|${metPayload}`)); }, 200); await closeAllPositions(ctx); diff --git a/tests/stream-ohlc.test.ts b/tests/stream-ohlc.test.ts new file mode 100644 index 0000000..99e2da9 --- /dev/null +++ b/tests/stream-ohlc.test.ts @@ -0,0 +1,194 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { EventEmitter } from "events"; +import { WS_MESSAGE } from "@/constants/enums"; +import { DxtradeError } from "@/constants/errors"; +import { streamOHLC } from "@/domains/ohlc"; +import { createMockContext } from "./helpers"; +import type { WsManager } from "@/utils/ws-manager"; + +// --- Mocks --- + +vi.mock("axios", () => ({ + default: vi.fn().mockResolvedValue({ data: {} }), + isAxiosError: () => false, +})); + +beforeEach(() => { + vi.clearAllMocks(); +}); + +// --- Helpers --- + +const mockBars = [ + { timestamp: 1000, open: 1.1, high: 1.2, low: 1.0, close: 1.15, volume: 100, vwap: 1.12, time: 1000 }, + { timestamp: 2000, open: 1.15, high: 1.25, low: 1.1, close: 1.2, volume: 200, vwap: 1.17, time: 2000 }, +]; + +const liveBars = [ + { timestamp: 3000, open: 1.2, high: 1.3, low: 1.15, close: 1.25, volume: 150, vwap: 1.22, time: 3000 }, +]; + +function createMockWsManager(): WsManager { + const emitter = new EventEmitter(); + + return Object.assign(emitter, { + connect: vi.fn(), + close: vi.fn(), + get isConnected() { + return true; + }, + waitFor: vi.fn(), + getCached: vi.fn(), + _cache: new Map(), + }) as unknown as WsManager; +} + +function emitChartFeed(wsManager: WsManager, body: Record) { + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.CHART_FEED_SUBTOPIC, body); +} + +// --- Tests --- + +describe("streamOHLC", () => { + it("should emit snapshot bars after snapshotEnd", async () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + const callback = vi.fn(); + + const promise = streamOHLC(ctx, { symbol: "EURUSD" }, callback); + + // Simulate snapshot data arriving + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: mockBars, + snapshotEnd: false, + }); + + // Simulate snapshotEnd + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: [], + snapshotEnd: true, + }); + + const unsubscribe = await promise; + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(mockBars); + expect(typeof unsubscribe).toBe("function"); + }); + + it("should emit live bar updates after snapshot", async () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + const callback = vi.fn(); + + const promise = streamOHLC(ctx, { symbol: "EURUSD" }, callback); + + // Complete snapshot + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: mockBars, + snapshotEnd: true, + }); + + await promise; + callback.mockClear(); + + // Simulate live update + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: liveBars, + }); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(liveBars); + }); + + it("should stop receiving updates after unsubscribe", async () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + const callback = vi.fn(); + + const promise = streamOHLC(ctx, { symbol: "EURUSD" }, callback); + + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: mockBars, + snapshotEnd: true, + }); + + const unsubscribe = await promise; + callback.mockClear(); + + unsubscribe(); + + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: liveBars, + }); + + expect(callback).not.toHaveBeenCalled(); + }); + + it("should throw STREAM_REQUIRES_CONNECT when wsManager is null", async () => { + const ctx = createMockContext({ wsManager: null }); + + await expect(streamOHLC(ctx, { symbol: "EURUSD" }, vi.fn())).rejects.toThrow(DxtradeError); + await expect(streamOHLC(ctx, { symbol: "EURUSD" }, vi.fn())).rejects.toThrow("connect()"); + }); + + it("should ignore messages with different subtopic", async () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + const callback = vi.fn(); + + const promise = streamOHLC(ctx, { symbol: "EURUSD" }, callback); + + // Emit message with wrong subtopic — should be ignored + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.BIG_CHART_COMPONENT, + data: mockBars, + snapshotEnd: true, + }); + + expect(callback).not.toHaveBeenCalled(); + + // Now emit correct subtopic to complete the promise + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: mockBars, + snapshotEnd: true, + }); + + await promise; + expect(callback).toHaveBeenCalledTimes(1); + }); + + it("should accumulate bars across multiple messages before snapshotEnd", async () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + const callback = vi.fn(); + + const promise = streamOHLC(ctx, { symbol: "EURUSD" }, callback); + + // First batch + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: [mockBars[0]], + snapshotEnd: false, + }); + + // Second batch + emitChartFeed(wsManager, { + subtopic: WS_MESSAGE.SUBTOPIC.OHLC_STREAM, + data: [mockBars[1]], + snapshotEnd: true, + }); + + await promise; + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(mockBars); + }); +}); diff --git a/tests/stream-positions.test.ts b/tests/stream-positions.test.ts index db20b7e..920f3b5 100644 --- a/tests/stream-positions.test.ts +++ b/tests/stream-positions.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; import { EventEmitter } from "events"; -import { WS_MESSAGE, ERROR } from "@/constants/enums"; +import { WS_MESSAGE } from "@/constants/enums"; import { DxtradeError } from "@/constants/errors"; import { streamPositions, getPositions } from "@/domains/position"; import { createMockContext } from "./helpers"; @@ -29,6 +29,39 @@ beforeEach(() => { // --- Helpers --- +const mockPositions = [ + { + uid: "u1", + accountId: "ACC-123", + positionKey: { instrumentId: 3438, positionCode: "POS-1" }, + quantity: 1000, + cost: 1000, + costBasis: 1, + openCost: 1000, + marginRate: 0.01, + time: 0, + modifiedTime: 0, + userLogin: "test", + takeProfit: null, + stopLoss: null, + }, +]; + +const mockMetrics = [ + { + uid: "u1", + accountId: "ACC-123", + margin: 10, + plOpen: 5.5, + plClosed: 0, + totalCommissions: -0.03, + totalFinancing: -0.1, + plRate: 1.105, + averagePrice: 1.1, + marketValue: 1100, + }, +]; + function createMockWsManager(initialCache?: Record): WsManager { const emitter = new EventEmitter(); const cache = new Map(initialCache ? Object.entries(initialCache) : []); @@ -54,45 +87,63 @@ function createMockWsManager(initialCache?: Record): WsManager // --- Tests --- describe("streamPositions", () => { - it("should invoke callback on position updates", () => { - const wsManager = createMockWsManager(); + it("should emit merged positions on POSITION_METRICS update", () => { + const wsManager = createMockWsManager({ + [WS_MESSAGE.POSITIONS]: mockPositions, + }); const ctx = createMockContext({ wsManager }); const callback = vi.fn(); streamPositions(ctx, callback); - const positions = [{ positionCode: "POS-1", quantity: 100 }]; - (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, positions); + // Clear the initial cached emission + callback.mockClear(); + + // Simulate a POSITION_METRICS update — cache it so getCached returns it + (wsManager as any)._cache.set(WS_MESSAGE.POSITION_METRICS, mockMetrics); + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITION_METRICS, mockMetrics); - expect(callback).toHaveBeenCalledWith(positions); + expect(callback).toHaveBeenCalledTimes(1); + const result = callback.mock.calls[0][0]; + expect(result[0].uid).toBe("u1"); + expect(result[0].quantity).toBe(1000); + expect(result[0].plOpen).toBe(5.5); }); - it("should stop receiving updates after unsubscribe", () => { - const wsManager = createMockWsManager(); + it("should immediately emit cached merged data on subscribe", () => { + const wsManager = createMockWsManager({ + [WS_MESSAGE.POSITIONS]: mockPositions, + [WS_MESSAGE.POSITION_METRICS]: mockMetrics, + }); const ctx = createMockContext({ wsManager }); const callback = vi.fn(); - const unsubscribe = streamPositions(ctx, callback); - - (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, []); - expect(callback).toHaveBeenCalledTimes(1); - - unsubscribe(); + streamPositions(ctx, callback); - (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, []); expect(callback).toHaveBeenCalledTimes(1); + const result = callback.mock.calls[0][0]; + expect(result[0].plOpen).toBe(5.5); + expect(result[0].quantity).toBe(1000); }); - it("should immediately emit cached positions on subscribe", () => { - const cachedPositions = [{ positionCode: "POS-1", quantity: 100 }]; - const wsManager = createMockWsManager({ [WS_MESSAGE.POSITIONS]: cachedPositions }); + it("should stop receiving updates after unsubscribe", () => { + const wsManager = createMockWsManager({ + [WS_MESSAGE.POSITIONS]: mockPositions, + [WS_MESSAGE.POSITION_METRICS]: mockMetrics, + }); const ctx = createMockContext({ wsManager }); const callback = vi.fn(); - streamPositions(ctx, callback); + const unsubscribe = streamPositions(ctx, callback); + callback.mockClear(); + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITION_METRICS, mockMetrics); + expect(callback).toHaveBeenCalledTimes(1); + + unsubscribe(); + + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITION_METRICS, mockMetrics); expect(callback).toHaveBeenCalledTimes(1); - expect(callback).toHaveBeenCalledWith(cachedPositions); }); it("should throw STREAM_REQUIRES_CONNECT when wsManager is null", () => { @@ -104,29 +155,36 @@ describe("streamPositions", () => { }); describe("getPositions with wsManager", () => { - it("should use wsManager.waitFor when available", async () => { - const wsManager = createMockWsManager(); - const mockPositions = [{ positionCode: "POS-1", quantity: 100 }]; - (wsManager.waitFor as ReturnType).mockResolvedValue(mockPositions); + it("should use wsManager.waitFor and merge results", async () => { + const wsManager = createMockWsManager({ + [WS_MESSAGE.POSITIONS]: mockPositions, + [WS_MESSAGE.POSITION_METRICS]: mockMetrics, + }); const ctx = createMockContext({ wsManager }); const result = await getPositions(ctx); - expect(wsManager.waitFor).toHaveBeenCalledWith(WS_MESSAGE.POSITIONS); - expect(result).toEqual(mockPositions); + expect(result).toHaveLength(1); + expect(result[0].uid).toBe("u1"); + expect(result[0].quantity).toBe(1000); + expect(result[0].plOpen).toBe(5.5); + expect(result[0].margin).toBe(10); }); it("should fall back to WebSocket when wsManager is null", async () => { const ctx = createMockContext({ wsManager: null }); - const mockPositions = [{ positionCode: "POS-1", quantity: 100 }]; const promise = getPositions(ctx); - const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); - wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + const posPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); + wsInstance.emit("message", Buffer.from(`${posPayload.length}|${posPayload}`)); + + const metPayload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITION_METRICS, body: mockMetrics }); + wsInstance.emit("message", Buffer.from(`${metPayload.length}|${metPayload}`)); const result = await promise; - expect(result).toEqual(mockPositions); + expect(result).toHaveLength(1); + expect(result[0].plOpen).toBe(5.5); expect(wsInstance.close).toHaveBeenCalled(); }); });