From 3a201521a24e1a504e7c22260570f99940e59d12 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Feb 2026 23:17:04 +0100 Subject: [PATCH 1/2] feat: Added websocket stream, added new stream connection with max 1 ws. improved error logging --- README.md | 106 ++++--- ...{account-metrics.ts => account.metrics.ts} | 4 +- ...de-history.ts => account.trade-history.ts} | 4 +- ...de-journal.ts => account.trade-journal.ts} | 4 +- ...{get-assessments.ts => assessments.get.ts} | 4 +- examples/connect.ts | 3 +- examples/debug.ts | 14 - .../{instruments.ts => instruments.get.ts} | 10 +- examples/{ohlc.ts => ohlc.get.ts} | 4 +- examples/{orders.ts => orders.submit.ts} | 14 +- ...ll-positions.ts => positions.close-all.ts} | 6 +- .../{close-position.ts => positions.close.ts} | 4 +- examples/{positions.ts => positions.get.ts} | 4 +- ...sition-metrics.ts => positions.metrics.ts} | 4 +- examples/positions.stream.ts | 27 ++ examples/{symbol-info.ts => symbols.info.ts} | 8 +- llms.txt | 68 +++-- package.json | 34 +-- src/client.ts | 278 +++++++++++------- src/client.types.ts | 2 + src/constants/endpoints.ts | 2 +- src/constants/enums.ts | 7 + src/domains/account/account.ts | 13 +- src/domains/instrument/instrument.ts | 2 +- src/domains/order/order.ts | 4 + src/domains/position/position.ts | 32 ++ src/domains/session/session.ts | 20 +- src/domains/symbol/symbol.ts | 2 +- src/utils/index.ts | 1 + src/utils/retry.ts | 5 +- src/utils/ws-manager.ts | 80 +++++ tests/account.test.ts | 2 +- tests/helpers.ts | 1 + tests/orders.test.ts | 36 ++- tests/positions.test.ts | 8 +- tests/stream-positions.test.ts | 132 +++++++++ tests/ws-manager.test.ts | 143 +++++++++ 37 files changed, 830 insertions(+), 262 deletions(-) rename examples/{account-metrics.ts => account.metrics.ts} (84%) rename examples/{trade-history.ts => account.trade-history.ts} (85%) rename examples/{trade-journal.ts => account.trade-journal.ts} (86%) rename examples/{get-assessments.ts => assessments.get.ts} (88%) delete mode 100644 examples/debug.ts rename examples/{instruments.ts => instruments.get.ts} (63%) rename examples/{ohlc.ts => ohlc.get.ts} (88%) rename examples/{orders.ts => orders.submit.ts} (83%) rename examples/{close-all-positions.ts => positions.close-all.ts} (81%) rename examples/{close-position.ts => positions.close.ts} (91%) rename examples/{positions.ts => positions.get.ts} (85%) rename examples/{position-metrics.ts => positions.metrics.ts} (84%) create mode 100644 examples/positions.stream.ts rename examples/{symbol-info.ts => symbols.info.ts} (79%) create mode 100644 src/utils/ws-manager.ts create mode 100644 tests/stream-positions.test.ts create mode 100644 tests/ws-manager.test.ts diff --git a/README.md b/README.md index 165c134..67005db 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ npm install dxtrade-api - [x] OHLC / price bar data - [x] PnL assessments - [x] Multi-broker support (FTMO, Eightcap, Lark Funding) +- [x] Persistent WebSocket with `connect()` +- [x] Real-time position streaming - [x] Full TypeScript support - [ ] Batch orders - [ ] Modify existing orders @@ -46,12 +48,13 @@ const client = new DxtradeClient({ accountId: "optional_account_id", }); +// connect() = auth + persistent WebSocket (recommended) await client.connect(); -const suggestions = await client.getSymbolSuggestions("EURUSD"); +const suggestions = await client.symbols.search("EURUSD"); const symbol = suggestions[0]; -const order = await client.submitOrder({ +const order = await client.orders.submit({ symbol: symbol.name, side: SIDE.BUY, quantity: 0.01, @@ -60,6 +63,18 @@ const order = await client.submitOrder({ }); console.log(`Order ${order.orderId}: ${order.status}`); +client.disconnect(); +``` + +## Connection Modes + +```ts +// Persistent WebSocket (recommended) — reuses one WS for all data, enables streaming +await client.connect(); +client.disconnect(); // when done + +// Lightweight — auth only, each data call opens a temporary WebSocket +await client.auth(); ``` ## Configuration @@ -89,42 +104,51 @@ BROKER.FTMO // "https://dxtrade.ftmo.com" ### Session -- `client.connect()` — Login, fetch CSRF, WebSocket handshake, optional account switch +- `client.connect()` — Auth + persistent WebSocket. Recommended for most use cases. +- `client.auth()` — Lightweight: login, fetch CSRF, WebSocket handshake, optional account switch. No persistent WS. +- `client.disconnect()` — Close the persistent WebSocket connection - `client.login()` — Authenticate with broker - `client.fetchCsrf()` — Fetch CSRF token from broker page - `client.switchAccount(accountId)` — Switch to a specific account -### Market Data +### Positions + +- `client.positions.get()` — Get all open positions +- `client.positions.close(params)` — Close a position (supports partial closes via the quantity field) +- `client.positions.closeAll()` — Close all open positions with market orders +- `client.positions.metrics()` — Get position-level P&L metrics +- `client.positions.stream(callback)` — Stream real-time position updates (requires `connect()`). Returns an unsubscribe function. -- `client.getSymbolSuggestions(text)` — Search for symbols -- `client.getSymbolInfo(symbol)` — Get instrument info (volume limits, lot size) -- `client.getSymbolLimits()` — Get order size limits and stop/limit distances for all symbols -- `client.getInstruments(params?)` — Get all available instruments, optionally filtered by partial match (e.g. `{ type: "FOREX" }`) -- `client.getOHLC(params)` — Fetch OHLC price bars for a symbol (resolution, range, maxBars, priceField) +### Orders -### Trading +- `client.orders.get()` — Get all pending/open orders +- `client.orders.submit(params)` — Submit an order and wait for WebSocket confirmation +- `client.orders.cancel(orderChainId)` — Cancel a single pending order +- `client.orders.cancelAll()` — Cancel all pending orders -- `client.submitOrder(params)` — Submit an order and wait for WebSocket confirmation -- `client.getOrders()` — Get all pending/open orders via WebSocket -- `client.cancelOrder(orderChainId)` — Cancel a single pending order -- `client.cancelAllOrders()` — Cancel all pending orders +### Account -### Positions +- `client.account.metrics()` — Get account metrics (equity, balance, margin, open P&L, etc.) +- `client.account.tradeJournal({ from, to })` — Fetch trade journal entries for a date range (Unix timestamps) +- `client.account.tradeHistory({ from, to })` — Fetch trade history for a date range (Unix timestamps) -- `client.getPositions()` — Get all open positions via WebSocket -- `client.closePosition(params)` — Close a position (supports partial closes via the quantity field) -- `client.closeAllPositions()` — Close all open positions with market orders -- `client.getPositionMetrics()` — Get position-level P&L metrics via WebSocket +### Symbols -### Account +- `client.symbols.search(text)` — Search for symbols +- `client.symbols.info(symbol)` — Get instrument info (volume limits, lot size) +- `client.symbols.limits()` — Get order size limits and stop/limit distances for all symbols + +### Instruments + +- `client.instruments.get(params?)` — Get all available instruments, optionally filtered by partial match (e.g. `{ type: "FOREX" }`) + +### OHLC -- `client.getAccountMetrics()` — Get account metrics (equity, balance, margin, open P&L, etc.) -- `client.getTradeJournal({ from, to })` — Fetch trade journal entries for a date range (Unix timestamps) -- `client.getTradeHistory({ from, to })` — Fetch trade history for a date range (Unix timestamps) +- `client.ohlc.get(params)` — Fetch OHLC price bars for a symbol (resolution, range, maxBars, priceField) -### Analytics +### Assessments -- `client.getAssessments(params)` — Fetch PnL assessments for a date range +- `client.assessments.get(params)` — Fetch PnL assessments for a date range ## Enums @@ -157,23 +181,23 @@ const client = new DxtradeClient({ ```bash cp .env.example .env # fill in credentials npm run example:connect -npm run example:order -npm run example:orders -npm run example:positions -npm run example:close-position -npm run example:close-all-positions -npm run example:position-metrics -npm run example:assessments -npm run example:assessments:btc -npm run example:account -npm run example:instruments -npm run example:ohlc -npm run example:instruments:forex -npm run example:symbol -npm run example:symbol:btc -npm run example:trade-journal -npm run example:trade-history npm run example:debug +npm run example:positions:get +npm run example:positions:close +npm run example:positions:close-all +npm run example:positions:metrics +npm run example:positions:stream +npm run example:orders:submit +npm run example:account:metrics +npm run example:account:trade-journal +npm run example:account:trade-history +npm run example:symbols:info +npm run example:symbols:info:btc +npm run example:instruments:get +npm run example:instruments:get:forex +npm run example:ohlc:get +npm run example:assessments:get +npm run example:assessments:get:btc ``` ## DXtrade API Docs diff --git a/examples/account-metrics.ts b/examples/account.metrics.ts similarity index 84% rename from examples/account-metrics.ts rename to examples/account.metrics.ts index 8f8141f..9e35c60 100644 --- a/examples/account-metrics.ts +++ b/examples/account.metrics.ts @@ -10,8 +10,8 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); - const metrics = await client.getAccountMetrics(); + await client.auth(); + const metrics = await client.account.metrics(); console.log("Account metrics:", metrics); })().catch(console.error); diff --git a/examples/trade-history.ts b/examples/account.trade-history.ts similarity index 85% rename from examples/trade-history.ts rename to examples/account.trade-history.ts index 9906313..729b775 100644 --- a/examples/trade-history.ts +++ b/examples/account.trade-history.ts @@ -10,12 +10,12 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); const from = new Date(new Date().setMonth(new Date().getMonth() - 1)).getTime(); const to = Date.now(); - const history = await client.getTradeHistory({ from, to }); + const history = await client.account.tradeHistory({ from, to }); console.log("Trade history:", history); })().catch(console.error); diff --git a/examples/trade-journal.ts b/examples/account.trade-journal.ts similarity index 86% rename from examples/trade-journal.ts rename to examples/account.trade-journal.ts index a2edd3c..f56949e 100644 --- a/examples/trade-journal.ts +++ b/examples/account.trade-journal.ts @@ -10,12 +10,12 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); // convert the dates to normal dates const from = new Date(new Date().setMonth(new Date().getMonth() - 1)).getTime(); const to = Date.now(); - const journal = await client.getTradeJournal({ from, to }); + const journal = await client.account.tradeJournal({ from, to }); console.log("Trade journal:", journal); })().catch(console.error); diff --git a/examples/get-assessments.ts b/examples/assessments.get.ts similarity index 88% rename from examples/get-assessments.ts rename to examples/assessments.get.ts index 326e0f0..0480446 100644 --- a/examples/get-assessments.ts +++ b/examples/assessments.get.ts @@ -10,12 +10,12 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); const now = Date.now(); const oneWeekAgo = now - 7 * 24 * 60 * 60 * 1000; - const assessments = await client.getAssessments({ + const assessments = await client.assessments.get({ from: oneWeekAgo, to: now, instrument: "EURUSD", diff --git a/examples/connect.ts b/examples/connect.ts index 8ad6d85..d775f18 100644 --- a/examples/connect.ts +++ b/examples/connect.ts @@ -16,5 +16,6 @@ const client = new DxtradeClient({ (async () => { await client.connect(); - console.log("Connected successfully"); + console.log("Connected successfully (persistent WS open)"); + client.disconnect(); })().catch(console.error); diff --git a/examples/debug.ts b/examples/debug.ts deleted file mode 100644 index 4933ad1..0000000 --- a/examples/debug.ts +++ /dev/null @@ -1,14 +0,0 @@ -import "dotenv/config"; -import { DxtradeClient, BROKER } from "../src"; - -const client = new DxtradeClient({ - username: process.env.DXTRADE_USERNAME!, - password: process.env.DXTRADE_PASSWORD!, - broker: process.env.DXTRADE_BROKER! || BROKER.FTMO, - accountId: process.env.DXTRADE_ACCOUNT_ID, - debug: true, -}); - -(async () => { - await client.connect(); -})().catch(console.error); diff --git a/examples/instruments.ts b/examples/instruments.get.ts similarity index 63% rename from examples/instruments.ts rename to examples/instruments.get.ts index c74196c..6ab3436 100644 --- a/examples/instruments.ts +++ b/examples/instruments.get.ts @@ -10,16 +10,16 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); - console.log("Connected — fetching instruments\n"); + await client.auth(); + console.log("Authenticated — fetching instruments\n"); // Get all instruments - const instruments = await client.getInstruments(); - console.log("instruments", "[\n", instruments[0], `\n...and ${instruments.length - 1} more`, "\n]"); + const instruments = await client.instruments.get(); + console.log("instruments: ", "[\n", instruments[0], `\n...and ${instruments.length - 1} more`, "\n]"); console.log("\n===================================\n"); // Get filtered instruments - const instrumentFiltered = await client.getInstruments({ symbol: "BTCUSD" }); + const instrumentFiltered = await client.instruments.get({ symbol: "BTCUSD" }); console.log("instrumentFiltered: ", instrumentFiltered); })().catch(console.error); diff --git a/examples/ohlc.ts b/examples/ohlc.get.ts similarity index 88% rename from examples/ohlc.ts rename to examples/ohlc.get.ts index 12f67cc..beb750a 100644 --- a/examples/ohlc.ts +++ b/examples/ohlc.get.ts @@ -12,9 +12,9 @@ const client = new DxtradeClient({ const symbol = process.argv[2] ?? "EURUSD"; (async () => { - await client.connect(); + await client.auth(); - const bars = await client.getOHLC({ symbol }); + const bars = await client.ohlc.get({ symbol }); console.log("Last 5 bars:", "[\n", ...bars.slice(-5), `\n...and ${bars.length - 5} more`, "\n]"); console.log(`Fetched ${bars.length} bars for ${symbol}`); diff --git a/examples/orders.ts b/examples/orders.submit.ts similarity index 83% rename from examples/orders.ts rename to examples/orders.submit.ts index a39efcb..da2aa32 100644 --- a/examples/orders.ts +++ b/examples/orders.submit.ts @@ -12,17 +12,17 @@ const client = new DxtradeClient({ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); (async () => { - await client.connect(); + await client.auth(); - const suggestions = await client.getSymbolSuggestions("EURUSD"); + const suggestions = await client.symbols.search("EURUSD"); const symbol = suggestions[0]; console.log(`Found symbol: ${symbol.name} (id: ${symbol.id})`); - const info = await client.getSymbolInfo(symbol.name); + const info = await client.symbols.info(symbol.name); console.log(`Min volume: ${info.minVolume}, Lot size: ${info.lotSize}`); // 1. Submit a market order - const order = await client.submitOrder({ + const order = await client.orders.submit({ symbol: symbol.name, side: SIDE.BUY, quantity: info.minVolume, @@ -34,13 +34,13 @@ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); // 2. Wait 2 seconds, then close the position await sleep(2000); console.log("\nClosing position..."); - await client.closeAllPositions(); + await client.positions.closeAll(); console.log("All positions closed"); // 3. Wait 2 seconds, then submit a limit order and immediately cancel it await sleep(2000); console.log("\nPlacing limit order..."); - const limitOrder = await client.submitOrder({ + const limitOrder = await client.orders.submit({ symbol: symbol.name, side: SIDE.BUY, quantity: info.minVolume, @@ -51,6 +51,6 @@ const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); console.log(`Limit order placed: ${limitOrder.orderId} — status: ${limitOrder.status}`); console.log("Cancelling order..."); - await client.cancelAllOrders(); + await client.orders.cancelAll(); console.log("All orders cancelled"); })().catch(console.error); diff --git a/examples/close-all-positions.ts b/examples/positions.close-all.ts similarity index 81% rename from examples/close-all-positions.ts rename to examples/positions.close-all.ts index ea6fe06..f543332 100644 --- a/examples/close-all-positions.ts +++ b/examples/positions.close-all.ts @@ -10,12 +10,12 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); - const positions = await client.getPositions(); + const positions = await client.positions.get(); console.log(`Closing ${positions.length} position(s)...`); - await client.closeAllPositions(); + await client.positions.closeAll(); console.log("All positions closed"); })().catch(console.error); diff --git a/examples/close-position.ts b/examples/positions.close.ts similarity index 91% rename from examples/close-position.ts rename to examples/positions.close.ts index 15912d7..98c9844 100644 --- a/examples/close-position.ts +++ b/examples/positions.close.ts @@ -10,10 +10,10 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); // TODO:: improve parameters! maybe even have a "closeWholePosition" function - const positions = await client.closePosition({ + const positions = await client.positions.close({ legs: [ { instrumentId: 3438, diff --git a/examples/positions.ts b/examples/positions.get.ts similarity index 85% rename from examples/positions.ts rename to examples/positions.get.ts index c20512b..15a950d 100644 --- a/examples/positions.ts +++ b/examples/positions.get.ts @@ -10,8 +10,8 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); - const positions = await client.getPositions(); + await client.auth(); + const positions = await client.positions.get(); console.log("Positions: ", positions); })().catch(console.error); diff --git a/examples/position-metrics.ts b/examples/positions.metrics.ts similarity index 84% rename from examples/position-metrics.ts rename to examples/positions.metrics.ts index 5733cf6..d28acf1 100644 --- a/examples/position-metrics.ts +++ b/examples/positions.metrics.ts @@ -10,8 +10,8 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); - const metrics = await client.getPositionMetrics(); + await client.auth(); + const metrics = await client.positions.metrics(); console.log("Position metrics:", metrics); })().catch(console.error); diff --git a/examples/positions.stream.ts b/examples/positions.stream.ts new file mode 100644 index 0000000..1d6872a --- /dev/null +++ b/examples/positions.stream.ts @@ -0,0 +1,27 @@ +import "dotenv/config"; +import { DxtradeClient, BROKER } from "../src"; + +const client = new DxtradeClient({ + username: process.env.DXTRADE_USERNAME!, + password: process.env.DXTRADE_PASSWORD!, + broker: process.env.DXTRADE_BROKER! || BROKER.FTMO, + accountId: process.env.DXTRADE_ACCOUNT_ID, + debug: process.env.DXTRADE_DEBUG || false, +}); + +(async () => { + await client.connect(); + console.log("Connected — streaming positions...\n"); + + const unsubscribe = client.positions.stream((positions) => { + console.log("Positions: ", positions); + console.log(); + }); + + // Stream for 60 seconds then clean up + setTimeout(() => { + console.log("Unsubscribing and disconnecting..."); + unsubscribe(); + client.disconnect(); + }, 60_000); +})().catch(console.error); diff --git a/examples/symbol-info.ts b/examples/symbols.info.ts similarity index 79% rename from examples/symbol-info.ts rename to examples/symbols.info.ts index a0962ac..3a549b9 100644 --- a/examples/symbol-info.ts +++ b/examples/symbols.info.ts @@ -10,18 +10,18 @@ const client = new DxtradeClient({ }); (async () => { - await client.connect(); + await client.auth(); const query = process.argv[2] ?? "EURUSD"; - const suggestions = await client.getSymbolSuggestions(query); + const suggestions = await client.symbols.search(query); console.log("suggestions", suggestions); console.log("\n===================================\n"); const symbol = suggestions[0]; - const info = await client.getSymbolInfo(symbol.name); + const info = await client.symbols.info(symbol.name); console.log("info", info); console.log("\n===================================\n"); - const symbolLimits = await client.getSymbolLimits(); + const symbolLimits = await client.symbols.limits(); console.log("symbolLimits: ", "[\n", symbolLimits[0], `\n...and ${symbolLimits.length - 1} more`, "\n]"); })().catch(console.error); diff --git a/llms.txt b/llms.txt index b6bf313..98f5567 100644 --- a/llms.txt +++ b/llms.txt @@ -18,46 +18,54 @@ const client = new DxtradeClient({ accountId: "optional_account_id", }); -await client.connect(); +await client.connect(); // auth + persistent WebSocket (recommended) +// or: await client.auth(); // lightweight, no persistent WS ## Available Methods ### Session -- client.connect() — Login, fetch CSRF, WebSocket handshake, optional account switch. Call this first. -- client.login() — Authenticate with broker (called automatically by connect) -- client.fetchCsrf() — Fetch CSRF token (called automatically by connect) +- client.connect() — Auth + persistent WebSocket. Recommended for most use cases. Call this first. +- client.auth() — Lightweight: login, fetch CSRF, WebSocket handshake, optional account switch. No persistent WS. +- client.disconnect() — Close the persistent WebSocket connection +- client.login() — Authenticate with broker (called automatically by connect/auth) +- client.fetchCsrf() — Fetch CSRF token (called automatically by connect/auth) - client.switchAccount(accountId: string) — Switch to a specific trading account -### Market Data -- client.getSymbolSuggestions(text: string) — Search symbols by name, returns Symbol.Suggestion[] -- client.getSymbolInfo(symbol: string) — Get instrument info (volume limits, lot size), returns Symbol.Info -- client.getSymbolLimits() — Get order size limits for all symbols, returns Symbol.Limits[] -- client.getInstruments(params?: Partial) — Get all instruments, optionally filtered (e.g. { type: "FOREX" }) -- client.getOHLC(params: OHLC.Params) — Fetch OHLC price bars for a symbol, returns OHLC.Bar[] - Required params: symbol (string) - Optional params: resolution (seconds, default 300), range (seconds, default 345600), maxBars (default 3500), priceField ("bid" | "ask", default "bid") - -### Trading -- client.submitOrder(params: Order.SubmitParams) — Submit order and wait for WebSocket confirmation, returns Order.Update +### Positions +- client.positions.get() — Get all open positions, returns Position.Get[] +- client.positions.close(params: Position.Close) — Close a position (supports partial closes via the quantity field) +- client.positions.closeAll() — Close all open positions with market orders +- client.positions.metrics() — Get position-level P&L metrics, returns Position.Metrics[] +- client.positions.stream(callback: (positions: Position.Get[]) => void) — Stream real-time position updates, requires connect(). Returns unsubscribe function. + +### Orders +- client.orders.get() — Get all pending/open orders, returns Order.Get[] +- client.orders.submit(params: Order.SubmitParams) — Submit order and wait for WebSocket confirmation, returns Order.Update Required params: symbol, side (SIDE.BUY | SIDE.SELL), quantity, orderType (ORDER_TYPE.MARKET | LIMIT | STOP), instrumentId Optional params: limitPrice, stopPrice, stopLoss, takeProfit, timeInForce (TIF.GTC | DAY | GTD) -- client.getOrders() — Get all pending/open orders via WebSocket, returns Order.Get[] -- client.cancelOrder(orderChainId: number) — Cancel a single pending order by its order chain ID -- client.cancelAllOrders() — Cancel all pending orders (fetches orders then cancels each) - -### Positions -- client.getPositions() — Get all open positions via WebSocket, returns Position.Get[] -- client.closePosition(params: Position.Close) — Close a position (supports partial closes via the quantity field) -- client.closeAllPositions() — Close all open positions with market orders -- client.getPositionMetrics() — Get position-level P&L metrics via WebSocket, returns Position.Metrics[] +- client.orders.cancel(orderChainId: number) — Cancel a single pending order by its order chain ID +- client.orders.cancelAll() — Cancel all pending orders (fetches orders then cancels each) ### Account -- client.getAccountMetrics() — Get equity, balance, margin, open P&L, returns Account.Metrics -- client.getTradeJournal({ from: number, to: number }) — Fetch trade journal for date range (Unix timestamps) -- client.getTradeHistory({ from: number, to: number }) — Fetch trade history for date range (Unix timestamps), returns Account.TradeHistory[] +- client.account.metrics() — Get equity, balance, margin, open P&L, returns Account.Metrics +- client.account.tradeJournal({ from: number, to: number }) — Fetch trade journal for date range (Unix timestamps) +- client.account.tradeHistory({ from: number, to: number }) — Fetch trade history for date range (Unix timestamps), returns Account.TradeHistory[] + +### Symbols +- client.symbols.search(text: string) — Search symbols by name, returns Symbol.Suggestion[] +- client.symbols.info(symbol: string) — Get instrument info (volume limits, lot size), returns Symbol.Info +- client.symbols.limits() — Get order size limits for all symbols, returns Symbol.Limits[] + +### Instruments +- client.instruments.get(params?: Partial) — Get all instruments, optionally filtered (e.g. { type: "FOREX" }) + +### OHLC +- client.ohlc.get(params: OHLC.Params) — Fetch OHLC price bars for a symbol, returns OHLC.Bar[] + Required params: symbol (string) + Optional params: resolution (seconds, default 300), range (seconds, default 345600), maxBars (default 3500), priceField ("bid" | "ask", default "bid") -### Analytics -- client.getAssessments(params: Assessments.Params) — Fetch PnL assessments for a date range +### Assessments +- client.assessments.get(params: Assessments.Params) — Fetch PnL assessments for a date range ## Enums @@ -85,4 +93,4 @@ const client = new DxtradeClient({ ## Error Handling All errors are instances of DxtradeError with properties: code (string) and message (string). -Common error codes: NO_SESSION, ORDER_TIMEOUT, ORDER_ERROR, POSITION_CLOSE_ERROR. +Common error codes: NO_SESSION, ORDER_TIMEOUT, ORDER_ERROR, POSITION_CLOSE_ERROR, STREAM_REQUIRES_CONNECT. diff --git a/package.json b/package.json index 6da6fe3..f5853f1 100644 --- a/package.json +++ b/package.json @@ -24,23 +24,23 @@ "test": "vitest run", "test:watch": "vitest", "=============== Examples ===============": "", - "example:debug": "tsx examples/debug.ts", - "example:account": "tsx examples/account-metrics.ts", - "example:trade-journal": "tsx examples/trade-journal.ts", "example:connect": "tsx examples/connect.ts", - "example:positions": "tsx examples/positions.ts", - "example:close-position": "tsx examples/close-position.ts", - "example:close-all-positions": "tsx examples/close-all-positions.ts", - "example:position-metrics": "tsx examples/position-metrics.ts", - "example:orders": "tsx examples/orders.ts", - "example:assessments": "tsx examples/get-assessments.ts", - "example:assessments:btc": "tsx examples/get-assessments.ts BTCUSD", - "example:instruments": "tsx examples/instruments.ts", - "example:instruments:forex": "tsx examples/instruments.ts FOREX", - "example:symbol": "tsx examples/symbol-info.ts", - "example:ohlc": "tsx examples/ohlc.ts", - "example:trade-history": "tsx examples/trade-history.ts", - "example:symbol:btc": "tsx examples/symbol-info.ts BTCUSD", + "example:positions:get": "tsx examples/positions.get.ts", + "example:positions:close": "tsx examples/positions.close.ts", + "example:positions:close-all": "tsx examples/positions.close-all.ts", + "example:positions:metrics": "tsx examples/positions.metrics.ts", + "example:positions:stream": "tsx examples/positions.stream.ts", + "example:orders:submit": "tsx examples/orders.submit.ts", + "example:account:metrics": "tsx examples/account.metrics.ts", + "example:account:trade-journal": "tsx examples/account.trade-journal.ts", + "example:account:trade-history": "tsx examples/account.trade-history.ts", + "example:symbols:info": "tsx examples/symbols.info.ts", + "example:symbols:info:btc": "tsx examples/symbols.info.ts BTCUSD", + "example:instruments:get": "tsx examples/instruments.get.ts", + "example:instruments:get:forex": "tsx examples/instruments.get.ts FOREX", + "example:ohlc:get": "tsx examples/ohlc.get.ts", + "example:assessments:get": "tsx examples/assessments.get.ts", + "example:assessments:get:btc": "tsx examples/assessments.get.ts BTCUSD", "============= Git =============": "", "commit": "COMMITIZEN=1 cz", "prepare": "sh scripts/setup-hooks.sh", @@ -95,4 +95,4 @@ "czConfig": ".czrc.js" } } -} +} \ No newline at end of file diff --git a/src/client.ts b/src/client.ts index e24cbd7..2e9e0ef 100644 --- a/src/client.ts +++ b/src/client.ts @@ -5,13 +5,16 @@ import { login, fetchCsrf, switchAccount, + auth, connect, + disconnect, getAccountMetrics, getTradeHistory, getPositions, getPositionMetrics, closePosition, closeAllPositions, + streamPositions, getAssessments, getInstruments, getSymbolLimits, @@ -25,6 +28,142 @@ import { getTradeJournal, } from "@/domains"; +class PositionsDomain { + constructor(private _ctx: ClientContext) {} + + /** Get all open positions via WebSocket. */ + get(): Promise { + return getPositions(this._ctx); + } + + /** Close a position. Supports partial closes by specifying a quantity smaller than the full position size. */ + close(params: Position.Close): Promise { + return closePosition(this._ctx, params); + } + + /** Close all open positions with market orders. */ + closeAll(): Promise { + return closeAllPositions(this._ctx); + } + + /** Get position-level P&L metrics via WebSocket. */ + metrics(): Promise { + return getPositionMetrics(this._ctx); + } + + /** Stream real-time position updates. Requires connect(). Returns unsubscribe function. */ + stream(callback: (positions: Position.Get[]) => void): () => void { + return streamPositions(this._ctx, callback); + } +} + +class OrdersDomain { + constructor(private _ctx: ClientContext) {} + + /** Get all pending/open orders via WebSocket. */ + get(): Promise { + return getOrders(this._ctx); + } + + /** + * Submit a trading order and wait for WebSocket confirmation. + * Supports market, limit, and stop orders with optional stop loss and take profit. + */ + submit(params: Order.SubmitParams): Promise { + return submitOrder(this._ctx, params); + } + + /** Cancel a single pending order by its order chain ID. */ + cancel(orderChainId: number): Promise { + return cancelOrder(this._ctx, orderChainId); + } + + /** Cancel all pending orders. */ + cancelAll(): Promise { + return cancelAllOrders(this._ctx); + } +} + +class AccountDomain { + constructor(private _ctx: ClientContext) {} + + /** Get account metrics including equity, balance, margin, and open P&L. */ + metrics(): Promise { + return getAccountMetrics(this._ctx); + } + + /** + * Fetch trade journal entries for a date range. + * @param params.from - Start timestamp (Unix ms) + * @param params.to - End timestamp (Unix ms) + */ + tradeJournal(params: { from: number; to: number }): Promise { + return getTradeJournal(this._ctx, params); + } + + /** + * Fetch trade history for a date range. + * @param params.from - Start timestamp (Unix ms) + * @param params.to - End timestamp (Unix ms) + */ + tradeHistory(params: { from: number; to: number }): Promise { + return getTradeHistory(this._ctx, params); + } +} + +class SymbolsDomain { + constructor(private _ctx: ClientContext) {} + + /** Search for symbols matching the given text (e.g. "EURUSD", "BTC"). */ + search(text: string): Promise { + return getSymbolSuggestions(this._ctx, text); + } + + /** Get detailed instrument info for a symbol, including volume limits and lot size. */ + info(symbol: string): Promise { + return getSymbolInfo(this._ctx, symbol); + } + + /** Get order size limits and stop/limit distances for all symbols. */ + limits(): Promise { + return getSymbolLimits(this._ctx); + } +} + +class InstrumentsDomain { + constructor(private _ctx: ClientContext) {} + + /** Get all available instruments, optionally filtered by partial match (e.g. `{ type: "FOREX" }`). */ + get(params: Partial = {}): Promise { + return getInstruments(this._ctx, params); + } +} + +class OhlcDomain { + constructor(private _ctx: ClientContext) {} + + /** + * Fetch OHLC price bars for a symbol. + * @param params.symbol - Instrument symbol (e.g. "EURUSD") + * @param params.resolution - Bar period in seconds (default: 60 = 1 min) + * @param params.range - Lookback window in seconds (default: 432000 = 5 days) + * @param params.maxBars - Maximum bars to return (default: 3500) + * @param params.priceField - "bid" or "ask" (default: "bid") + */ + get(params: OHLC.Params): Promise { + return getOHLC(this._ctx, params); + } +} + +class AssessmentsDomain { + constructor(private _ctx: ClientContext) {} + + /** Fetch PnL assessments for an instrument within a date range. */ + get(params: Assessments.Params): Promise { + return getAssessments(this._ctx, params); + } +} + /** * Client for interacting with the DXtrade trading API. * @@ -44,6 +183,21 @@ import { export class DxtradeClient { private _ctx: ClientContext; + /** Position operations: get, close, metrics, streaming. */ + public readonly positions: PositionsDomain; + /** Order operations: get, submit, cancel. */ + public readonly orders: OrdersDomain; + /** Account operations: metrics, trade journal, trade history. */ + public readonly account: AccountDomain; + /** Symbol operations: search, info, limits. */ + public readonly symbols: SymbolsDomain; + /** Instrument operations: get (with optional filtering). */ + public readonly instruments: InstrumentsDomain; + /** OHLC price bar operations: get. */ + public readonly ohlc: OhlcDomain; + /** PnL assessment operations: get. */ + public readonly assessments: AssessmentsDomain; + constructor(config: DxtradeConfig) { const callbacks = config.callbacks ?? {}; @@ -54,6 +208,7 @@ export class DxtradeClient { csrf: null, accountId: config.accountId ?? null, atmosphereId: null, + wsManager: null, broker: config.broker, retries: config.retries ?? 3, debug: config.debug ?? false, @@ -61,7 +216,7 @@ export class DxtradeClient { if (!this.csrf) { throw new DxtradeError( ERROR.NO_SESSION, - "No active session. Call login() and fetchCsrf() or connect() first.", + "No active session. Call auth() or connect() first.", ); } }, @@ -71,6 +226,14 @@ export class DxtradeClient { throw error; }, }; + + this.positions = new PositionsDomain(this._ctx); + this.orders = new OrdersDomain(this._ctx); + this.account = new AccountDomain(this._ctx); + this.symbols = new SymbolsDomain(this._ctx); + this.instruments = new InstrumentsDomain(this._ctx); + this.ohlc = new OhlcDomain(this._ctx); + this.assessments = new AssessmentsDomain(this._ctx); } /** Authenticate with the broker using username and password. */ @@ -88,113 +251,18 @@ export class DxtradeClient { return switchAccount(this._ctx, accountId); } - /** Connect to the broker: login, fetch CSRF, WebSocket handshake, and optional account switch. */ - public async connect(): Promise { - return connect(this._ctx); - } - - /** Search for symbols matching the given text (e.g. "EURUSD", "BTC"). */ - public async getSymbolSuggestions(text: string): Promise { - return getSymbolSuggestions(this._ctx, text); - } - - /** Get detailed instrument info for a symbol, including volume limits and lot size. */ - public async getSymbolInfo(symbol: string): Promise { - return getSymbolInfo(this._ctx, symbol); - } - - /** Get order size limits and stop/limit distances for all symbols. */ - public async getSymbolLimits(): Promise { - return getSymbolLimits(this._ctx); - } - - /** - * Submit a trading order and wait for WebSocket confirmation. - * Supports market, limit, and stop orders with optional stop loss and take profit. - */ - public async submitOrder(params: Order.SubmitParams): Promise { - return submitOrder(this._ctx, params); - } - - /** Get all pending/open orders via WebSocket. */ - public async getOrders(): Promise { - return getOrders(this._ctx); - } - - /** Cancel a single pending order by its order chain ID. */ - public async cancelOrder(orderChainId: number): Promise { - return cancelOrder(this._ctx, orderChainId); - } - - /** Cancel all pending orders. */ - public async cancelAllOrders(): Promise { - return cancelAllOrders(this._ctx); - } - - /** Get account metrics including equity, balance, margin, and open P&L. */ - public async getAccountMetrics(): Promise { - return getAccountMetrics(this._ctx); - } - - /** Get all open positions via WebSocket. */ - public async getPositions(): Promise { - return getPositions(this._ctx); - } - - /** - * Close a position. Supports partial closes by specifying a quantity smaller than the full position size. - */ - public async closePosition(position: Position.Close): Promise { - return closePosition(this._ctx, position); - } - - /** Close all open positions with market orders. */ - public async closeAllPositions(): Promise { - return closeAllPositions(this._ctx); - } - - /** Get position-level P&L metrics via WebSocket. */ - public async getPositionMetrics(): Promise { - return getPositionMetrics(this._ctx); - } - - /** - * Fetch trade journal entries for a date range. - * @param params.from - Start timestamp (Unix ms) - * @param params.to - End timestamp (Unix ms) - */ - public async getTradeJournal(params: { from: number; to: number }): Promise { - return getTradeJournal(this._ctx, params); + /** Authenticate and establish a session: login, fetch CSRF, WebSocket handshake, and optional account switch. */ + public async auth(): Promise { + return auth(this._ctx); } - /** - * Fetch trade history for a date range. - * @param params.from - Start timestamp (Unix ms) - * @param params.to - End timestamp (Unix ms) - */ - public async getTradeHistory(params: { from: number; to: number }): Promise { - return getTradeHistory(this._ctx, params); - } - - /** Get all available instruments, optionally filtered by partial match (e.g. `{ type: "FOREX" }`). */ - public async getInstruments(params: Partial = {}): Promise { - return getInstruments(this._ctx, params); - } - - /** Fetch PnL assessments for an instrument within a date range. */ - public async getAssessments(params: Assessments.Params): Promise { - return getAssessments(this._ctx, params); + /** Connect to the broker with a persistent WebSocket: auth + persistent WS for data reuse and streaming. */ + public async connect(): Promise { + return connect(this._ctx); } - /** - * Fetch OHLC price bars for a symbol. - * @param params.symbol - Instrument symbol (e.g. "EURUSD") - * @param params.resolution - Bar period in seconds (default: 60 = 1 min) - * @param params.range - Lookback window in seconds (default: 432000 = 5 days) - * @param params.maxBars - Maximum bars to return (default: 3500) - * @param params.priceField - "bid" or "ask" (default: "bid") - */ - public async getOHLC(params: OHLC.Params): Promise { - return getOHLC(this._ctx, params); + /** Close the persistent WebSocket connection. */ + public disconnect(): void { + return disconnect(this._ctx); } } diff --git a/src/client.types.ts b/src/client.types.ts index 32f63f3..6b7a9bd 100644 --- a/src/client.types.ts +++ b/src/client.types.ts @@ -1,5 +1,6 @@ import type { DxtradeError, BROKER } from "@/constants"; import type { Order } from "@/domains/order"; +import type { WsManager } from "@/utils/ws-manager"; export interface DxtradeConfig { username: string; @@ -27,6 +28,7 @@ export interface ClientContext { csrf: string | null; accountId: string | null; atmosphereId: string | null; + wsManager: WsManager | null; broker: keyof typeof BROKER; retries: number; debug: boolean | string; diff --git a/src/constants/endpoints.ts b/src/constants/endpoints.ts index 29c6299..e1a8c0a 100644 --- a/src/constants/endpoints.ts +++ b/src/constants/endpoints.ts @@ -34,7 +34,7 @@ export const endpoints = { `${base}/api/tradejournal?from=${params.from}&to=${params.to}`, tradeHistory: (base: string, params: { from: number; to: number }) => - `${base}/api/history?from=${params.from}&to=${params.to}&orderId=`, + `${base}/api/history?from=${params.from}&to=${params.to}`, subscribeInstruments: (base: string) => `${base}/api/instruments/subscribeInstrumentSymbols`, diff --git a/src/constants/enums.ts b/src/constants/enums.ts index e780e67..dff4cf0 100644 --- a/src/constants/enums.ts +++ b/src/constants/enums.ts @@ -61,6 +61,13 @@ export enum ERROR { // Analytics ASSESSMENTS_ERROR = "ASSESSMENTS_ERROR", + + // Rate limiting + RATE_LIMITED = "RATE_LIMITED", + + // WebSocket manager + WS_MANAGER_ERROR = "WS_MANAGER_ERROR", + STREAM_REQUIRES_CONNECT = "STREAM_REQUIRES_CONNECT", } export enum WS_MESSAGE { diff --git a/src/domains/account/account.ts b/src/domains/account/account.ts index 95413e3..62d5216 100644 --- a/src/domains/account/account.ts +++ b/src/domains/account/account.ts @@ -1,12 +1,17 @@ import WebSocket from "ws"; import { WS_MESSAGE, ERROR, endpoints, DxtradeError } from "@/constants"; -import { Cookies, parseWsData, shouldLog, debugLog, retryRequest, baseHeaders } from "@/utils"; +import { Cookies, parseWsData, shouldLog, debugLog, retryRequest, baseHeaders, authHeaders } from "@/utils"; import type { ClientContext } from "@/client.types"; import type { Account } from "."; export async function getAccountMetrics(ctx: ClientContext, timeout = 30_000): Promise { ctx.ensureSession(); + if (ctx.wsManager) { + const body = await ctx.wsManager.waitFor<{ allMetrics: Account.Metrics }>(WS_MESSAGE.ACCOUNT_METRICS, timeout); + return body.allMetrics; + } + const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); const cookieStr = Cookies.serialize(ctx.cookies); @@ -46,13 +51,11 @@ export async function getTradeHistory( ctx.ensureSession(); try { - const cookieStr = Cookies.serialize(ctx.cookies); - const response = await retryRequest( { - method: "GET", + method: "POST", url: endpoints.tradeHistory(ctx.broker, params), - headers: { ...baseHeaders(), Cookie: cookieStr }, + headers: authHeaders(ctx.csrf!, Cookies.serialize(ctx.cookies)), }, ctx.retries, ); diff --git a/src/domains/instrument/instrument.ts b/src/domains/instrument/instrument.ts index 57354a0..28d3e22 100644 --- a/src/domains/instrument/instrument.ts +++ b/src/domains/instrument/instrument.ts @@ -48,7 +48,7 @@ export async function getInstruments( return true; }), ); - }, 0); + }, 200); } }); diff --git a/src/domains/order/order.ts b/src/domains/order/order.ts index 99a2939..1ca8016 100644 --- a/src/domains/order/order.ts +++ b/src/domains/order/order.ts @@ -94,6 +94,10 @@ function createOrderListener( export async function getOrders(ctx: ClientContext, timeout = 30_000): Promise { ctx.ensureSession(); + if (ctx.wsManager) { + return ctx.wsManager.waitFor(WS_MESSAGE.ORDERS, timeout); + } + const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); const cookieStr = Cookies.serialize(ctx.cookies); diff --git a/src/domains/position/position.ts b/src/domains/position/position.ts index 5e52894..460d7fa 100644 --- a/src/domains/position/position.ts +++ b/src/domains/position/position.ts @@ -4,9 +4,37 @@ import { Cookies, parseWsData, shouldLog, debugLog, retryRequest, authHeaders } import type { ClientContext } from "@/client.types"; import type { Position } from "."; +export function streamPositions( + ctx: ClientContext, + callback: (positions: Position.Get[]) => void, +): () => void { + if (!ctx.wsManager) { + ctx.throwError( + ERROR.STREAM_REQUIRES_CONNECT, + "Streaming requires a persistent WebSocket. Use connect() instead of auth().", + ); + } + + const listener = (body: Position.Get[]) => callback(body); + ctx.wsManager.on(WS_MESSAGE.POSITIONS, listener); + + const cached = ctx.wsManager.getCached(WS_MESSAGE.POSITIONS); + if (cached !== undefined) { + callback(cached); + } + + return () => { + ctx.wsManager?.removeListener(WS_MESSAGE.POSITIONS, listener); + }; +} + export async function getPositions(ctx: ClientContext): Promise { ctx.ensureSession(); + if (ctx.wsManager) { + return ctx.wsManager.waitFor(WS_MESSAGE.POSITIONS); + } + const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); const cookieStr = Cookies.serialize(ctx.cookies); @@ -41,6 +69,10 @@ export async function getPositions(ctx: ClientContext): Promise export async function getPositionMetrics(ctx: ClientContext, timeout = 30_000): Promise { ctx.ensureSession(); + if (ctx.wsManager) { + return ctx.wsManager.waitFor(WS_MESSAGE.POSITION_METRICS, timeout); + } + const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); const cookieStr = Cookies.serialize(ctx.cookies); diff --git a/src/domains/session/session.ts b/src/domains/session/session.ts index 4ca7499..a7de1dc 100644 --- a/src/domains/session/session.ts +++ b/src/domains/session/session.ts @@ -2,6 +2,7 @@ import WebSocket from "ws"; import { endpoints, DxtradeError, ERROR } from "@/constants"; import { Cookies, + WsManager, authHeaders, cookieOnlyHeaders, retryRequest, @@ -140,7 +141,7 @@ export async function switchAccount(ctx: ClientContext, accountId: string): Prom } } -export async function connect(ctx: ClientContext): Promise { +export async function auth(ctx: ClientContext): Promise { await login(ctx); await fetchCsrf(ctx); if (ctx.debug) clearDebugLog(); @@ -162,3 +163,20 @@ export async function connect(ctx: ClientContext): Promise { ctx.accountId = reconnect.accountId; } } + +export async function connect(ctx: ClientContext): Promise { + await auth(ctx); + + const wsManager = new WsManager(); + const wsUrl = endpoints.websocket(ctx.broker, ctx.atmosphereId); + const cookieStr = Cookies.serialize(ctx.cookies); + await wsManager.connect(wsUrl, cookieStr, ctx.debug); + ctx.wsManager = wsManager; +} + +export function disconnect(ctx: ClientContext): void { + if (ctx.wsManager) { + ctx.wsManager.close(); + ctx.wsManager = null; + } +} diff --git a/src/domains/symbol/symbol.ts b/src/domains/symbol/symbol.ts index 3467014..2108c0b 100644 --- a/src/domains/symbol/symbol.ts +++ b/src/domains/symbol/symbol.ts @@ -89,7 +89,7 @@ export async function getSymbolLimits(ctx: ClientContext, timeout = 30_000): Pro clearTimeout(timer); ws.close(); resolve(limits); - }, 0); + }, 200); } }); diff --git a/src/utils/index.ts b/src/utils/index.ts index 43ca8d1..5a199ed 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -2,3 +2,4 @@ export * from "./cookies"; export * from "./headers"; export * from "./retry"; export * from "./websocket"; +export * from "./ws-manager"; diff --git a/src/utils/retry.ts b/src/utils/retry.ts index 88dd941..514a1c2 100644 --- a/src/utils/retry.ts +++ b/src/utils/retry.ts @@ -1,4 +1,5 @@ import axios, { type AxiosRequestConfig, type AxiosResponse, isAxiosError } from "axios"; +import { DxtradeError, ERROR } from "@/constants"; export async function retryRequest(config: AxiosRequestConfig, retries = 3): Promise { for (let attempt = 1; attempt <= retries; attempt++) { @@ -7,7 +8,9 @@ export async function retryRequest(config: AxiosRequestConfig, retries = 3): Pro } catch (error: unknown) { const message = error instanceof Error ? error.message : "Unknown error"; console.warn(`[dxtrade-api] Attempt ${attempt} failed: ${message}`, config.url); - if (isAxiosError(error) && error.response?.status === 429) throw error; + if (isAxiosError(error) && error.response?.status === 429) { + throw new DxtradeError(ERROR.RATE_LIMITED, "Rate limited (429). Too many requests — try again later."); + } if (attempt === retries) throw error; await new Promise((res) => setTimeout(res, 1000 * attempt)); } diff --git a/src/utils/ws-manager.ts b/src/utils/ws-manager.ts new file mode 100644 index 0000000..a5afbfd --- /dev/null +++ b/src/utils/ws-manager.ts @@ -0,0 +1,80 @@ +import { EventEmitter } from "events"; +import WebSocket from "ws"; +import { parseWsData, shouldLog, debugLog } from "./websocket"; +import type { WsPayload } from "./websocket.types"; + +export class WsManager extends EventEmitter { + private _ws: WebSocket | null = null; + private _cache: Map = new Map(); + + connect(wsUrl: string, cookieStr: string, debug: boolean | string = false): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(wsUrl, { headers: { Cookie: cookieStr } }); + + ws.on("open", () => { + this._ws = ws; + resolve(); + }); + + ws.on("message", (data) => { + const msg = parseWsData(data); + if (shouldLog(msg, debug)) debugLog(msg); + if (typeof msg === "string") return; + + const payload = msg as WsPayload; + this._cache.set(payload.type, payload.body); + this.emit(payload.type, payload.body); + }); + + ws.on("error", (error) => { + if (!this._ws) { + return reject(error); + } + this.emit("error", error); + }); + + ws.on("close", () => { + this._ws = null; + this.emit("close"); + }); + }); + } + + waitFor(type: string, timeout = 30_000): Promise { + const cached = this._cache.get(type); + if (cached !== undefined) { + return Promise.resolve(cached as T); + } + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.removeListener(type, onMessage); + reject(new Error(`WsManager: timed out waiting for ${type}`)); + }, timeout); + + const onMessage = (body: T) => { + clearTimeout(timer); + resolve(body); + }; + + this.once(type, onMessage); + }); + } + + getCached(type: string): T | undefined { + return this._cache.get(type) as T | undefined; + } + + close(): void { + if (this._ws) { + this._ws.close(); + this._ws = null; + } + this._cache.clear(); + this.removeAllListeners(); + } + + get isConnected(): boolean { + return this._ws !== null && this._ws.readyState === WebSocket.OPEN; + } +} diff --git a/tests/account.test.ts b/tests/account.test.ts index d3231ce..24c2853 100644 --- a/tests/account.test.ts +++ b/tests/account.test.ts @@ -36,7 +36,7 @@ describe("getTradeHistory", () => { expect(result).toEqual(mockHistory); expect(mockRetryRequest).toHaveBeenCalledWith( expect.objectContaining({ - method: "GET", + method: "POST", url: expect.stringContaining("/api/history?from=1704067200000&to=1704153600000"), }), ctx.retries, diff --git a/tests/helpers.ts b/tests/helpers.ts index 61736be..8881c21 100644 --- a/tests/helpers.ts +++ b/tests/helpers.ts @@ -14,6 +14,7 @@ export function createMockContext(overrides: Partial = {}): Clien csrf: "csrf-token", accountId: "ACC-123", atmosphereId: "atm-id-123", + wsManager: null, broker: "https://dxtrade.ftmo.com", retries: 1, debug: false, diff --git a/tests/orders.test.ts b/tests/orders.test.ts index 33e04e9..e1e5958 100644 --- a/tests/orders.test.ts +++ b/tests/orders.test.ts @@ -38,8 +38,34 @@ describe("getOrders", () => { it("should return orders from WebSocket ORDERS message", async () => { const ctx = createMockContext(); const mockOrders = [ - { account: "ACC-123", orderId: 1, orderCode: "OC1", type: "LIMIT", instrument: "EURUSD", status: "WORKING", finalStatus: false, side: "BUY", tif: "GTC", legs: [], issueTime: "2024-01-01", transactionTime: "2024-01-01" }, - { account: "ACC-123", orderId: 2, orderCode: "OC2", type: "STOP", instrument: "BTCUSD", status: "WORKING", finalStatus: false, side: "SELL", tif: "GTC", legs: [], issueTime: "2024-01-01", transactionTime: "2024-01-01" }, + { + account: "ACC-123", + orderId: 1, + orderCode: "OC1", + type: "LIMIT", + instrument: "EURUSD", + status: "WORKING", + finalStatus: false, + side: "BUY", + tif: "GTC", + legs: [], + issueTime: "2024-01-01", + transactionTime: "2024-01-01", + }, + { + account: "ACC-123", + orderId: 2, + orderCode: "OC2", + type: "STOP", + instrument: "BTCUSD", + status: "WORKING", + finalStatus: false, + side: "SELL", + tif: "GTC", + legs: [], + issueTime: "2024-01-01", + transactionTime: "2024-01-01", + }, ]; const promise = getOrders(ctx); @@ -157,7 +183,7 @@ describe("cancelAllOrders", () => { setTimeout(() => { const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.ORDERS, body: mockOrders }); wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); - }, 0); + }, 200); mockRetryRequest.mockResolvedValue({ status: 200 }); @@ -177,7 +203,7 @@ describe("cancelAllOrders", () => { setTimeout(() => { const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.ORDERS, body: mockOrders }); wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); - }, 0); + }, 200); await cancelAllOrders(ctx); @@ -190,7 +216,7 @@ describe("cancelAllOrders", () => { setTimeout(() => { const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.ORDERS, body: [] }); wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); - }, 0); + }, 200); await cancelAllOrders(ctx); diff --git a/tests/positions.test.ts b/tests/positions.test.ts index 1f61df8..9204794 100644 --- a/tests/positions.test.ts +++ b/tests/positions.test.ts @@ -54,7 +54,9 @@ describe("getPositionMetrics", () => { it("should ignore non-matching WS message types", async () => { const ctx = createMockContext(); - const mockMetrics = [{ positionCode: "POS-1", openPl: 100, openPlPerLot: 10, currentPrice: 1.1, convertedOpenPl: 100 }]; + const mockMetrics = [ + { positionCode: "POS-1", openPl: 100, openPlPerLot: 10, currentPrice: 1.1, convertedOpenPl: 100 }, + ]; const promise = getPositionMetrics(ctx); @@ -141,7 +143,7 @@ describe("closeAllPositions", () => { setTimeout(() => { const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); - }, 0); + }, 200); mockRetryRequest.mockResolvedValue({ status: 200 }); @@ -171,7 +173,7 @@ describe("closeAllPositions", () => { setTimeout(() => { const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: [] }); wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); - }, 0); + }, 200); await closeAllPositions(ctx); diff --git a/tests/stream-positions.test.ts b/tests/stream-positions.test.ts new file mode 100644 index 0000000..db20b7e --- /dev/null +++ b/tests/stream-positions.test.ts @@ -0,0 +1,132 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { EventEmitter } from "events"; +import { WS_MESSAGE, ERROR } from "@/constants/enums"; +import { DxtradeError } from "@/constants/errors"; +import { streamPositions, getPositions } from "@/domains/position"; +import { createMockContext } from "./helpers"; +import type { WsManager } from "@/utils/ws-manager"; + +// --- Mocks --- + +let wsInstance: EventEmitter & { close: ReturnType }; + +vi.mock("ws", () => { + return { + default: class MockWebSocket extends EventEmitter { + close = vi.fn(); + constructor() { + super(); + // eslint-disable-next-line @typescript-eslint/no-this-alias + wsInstance = this as any; + } + }, + }; +}); + +beforeEach(() => { + vi.clearAllMocks(); +}); + +// --- Helpers --- + +function createMockWsManager(initialCache?: Record): WsManager { + const emitter = new EventEmitter(); + const cache = new Map(initialCache ? Object.entries(initialCache) : []); + + return Object.assign(emitter, { + connect: vi.fn(), + close: vi.fn(), + get isConnected() { + return true; + }, + waitFor: vi.fn((type: string) => { + const cached = cache.get(type); + if (cached !== undefined) return Promise.resolve(cached); + return new Promise((resolve) => { + emitter.once(type, resolve); + }); + }), + getCached: vi.fn((type: string) => cache.get(type)), + _cache: cache, + }) as unknown as WsManager; +} + +// --- Tests --- + +describe("streamPositions", () => { + it("should invoke callback on position updates", () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + + const callback = vi.fn(); + streamPositions(ctx, callback); + + const positions = [{ positionCode: "POS-1", quantity: 100 }]; + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, positions); + + expect(callback).toHaveBeenCalledWith(positions); + }); + + it("should stop receiving updates after unsubscribe", () => { + const wsManager = createMockWsManager(); + const ctx = createMockContext({ wsManager }); + + const callback = vi.fn(); + const unsubscribe = streamPositions(ctx, callback); + + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, []); + expect(callback).toHaveBeenCalledTimes(1); + + unsubscribe(); + + (wsManager as unknown as EventEmitter).emit(WS_MESSAGE.POSITIONS, []); + expect(callback).toHaveBeenCalledTimes(1); + }); + + it("should immediately emit cached positions on subscribe", () => { + const cachedPositions = [{ positionCode: "POS-1", quantity: 100 }]; + const wsManager = createMockWsManager({ [WS_MESSAGE.POSITIONS]: cachedPositions }); + const ctx = createMockContext({ wsManager }); + + const callback = vi.fn(); + streamPositions(ctx, callback); + + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(cachedPositions); + }); + + it("should throw STREAM_REQUIRES_CONNECT when wsManager is null", () => { + const ctx = createMockContext({ wsManager: null }); + + expect(() => streamPositions(ctx, vi.fn())).toThrow(DxtradeError); + expect(() => streamPositions(ctx, vi.fn())).toThrow("connect()"); + }); +}); + +describe("getPositions with wsManager", () => { + it("should use wsManager.waitFor when available", async () => { + const wsManager = createMockWsManager(); + const mockPositions = [{ positionCode: "POS-1", quantity: 100 }]; + (wsManager.waitFor as ReturnType).mockResolvedValue(mockPositions); + + const ctx = createMockContext({ wsManager }); + const result = await getPositions(ctx); + + expect(wsManager.waitFor).toHaveBeenCalledWith(WS_MESSAGE.POSITIONS); + expect(result).toEqual(mockPositions); + }); + + it("should fall back to WebSocket when wsManager is null", async () => { + const ctx = createMockContext({ wsManager: null }); + const mockPositions = [{ positionCode: "POS-1", quantity: 100 }]; + + const promise = getPositions(ctx); + + const payload = JSON.stringify({ accountId: null, type: WS_MESSAGE.POSITIONS, body: mockPositions }); + wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + + const result = await promise; + expect(result).toEqual(mockPositions); + expect(wsInstance.close).toHaveBeenCalled(); + }); +}); diff --git a/tests/ws-manager.test.ts b/tests/ws-manager.test.ts new file mode 100644 index 0000000..fca796b --- /dev/null +++ b/tests/ws-manager.test.ts @@ -0,0 +1,143 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { EventEmitter } from "events"; +import { WsManager } from "@/utils/ws-manager"; + +// --- Mocks --- + +let wsInstance: EventEmitter & { close: ReturnType; readyState: number }; +let autoOpen = true; + +vi.mock("ws", () => { + return { + default: class MockWebSocket extends EventEmitter { + static OPEN = 1; + close = vi.fn(); + readyState = 1; + constructor() { + super(); + // eslint-disable-next-line @typescript-eslint/no-this-alias + wsInstance = this as any; + if (autoOpen) { + setTimeout(() => this.emit("open"), 0); + } + } + }, + }; +}); + +beforeEach(() => { + vi.clearAllMocks(); + autoOpen = true; +}); + +// --- Tests --- + +describe("WsManager", () => { + describe("connect", () => { + it("should resolve when WebSocket opens", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + expect(manager.isConnected).toBe(true); + }); + + it("should reject when WebSocket errors before opening", async () => { + autoOpen = false; + const manager = new WsManager(); + const promise = manager.connect("wss://example.com", "cookie=abc"); + + wsInstance.readyState = 3; + wsInstance.emit("error", new Error("connection refused")); + + await expect(promise).rejects.toThrow("connection refused"); + }); + }); + + describe("message handling and cache", () => { + it("should parse messages, cache them, and emit events", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + + const listener = vi.fn(); + manager.on("POSITIONS", listener); + + const body = [{ positionCode: "POS-1", quantity: 100 }]; + const payload = JSON.stringify({ accountId: "ACC-1", type: "POSITIONS", body }); + wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + + expect(listener).toHaveBeenCalledWith(body); + }); + + it("should ignore string-only messages", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + + const listener = vi.fn(); + manager.on("POSITIONS", listener); + + wsInstance.emit("message", Buffer.from("heartbeat")); + + expect(listener).not.toHaveBeenCalled(); + }); + }); + + describe("waitFor", () => { + it("should resolve from cache if data already exists", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + + const body = [{ positionCode: "POS-1" }]; + const payload = JSON.stringify({ accountId: null, type: "POSITIONS", body }); + wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + + const result = await manager.waitFor("POSITIONS"); + expect(result).toEqual(body); + }); + + it("should wait for next message if not cached", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + + const promise = manager.waitFor("ORDERS"); + + const body = [{ orderId: 1 }]; + const payload = JSON.stringify({ accountId: null, type: "ORDERS", body }); + wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + + const result = await promise; + expect(result).toEqual(body); + }); + + it("should reject on timeout", async () => { + vi.useFakeTimers(); + const manager = new WsManager(); + + // Manually trigger open since fake timers block setTimeout + const connectPromise = manager.connect("wss://example.com", "cookie=abc"); + vi.advanceTimersByTime(1); + await connectPromise; + + const promise = manager.waitFor("ORDERS", 2000); + vi.advanceTimersByTime(2001); + + await expect(promise).rejects.toThrow("timed out waiting for ORDERS"); + vi.useRealTimers(); + }); + }); + + describe("close", () => { + it("should close the WebSocket and clear cache/listeners", async () => { + const manager = new WsManager(); + await manager.connect("wss://example.com", "cookie=abc"); + + const body = [{ positionCode: "POS-1" }]; + const payload = JSON.stringify({ accountId: null, type: "POSITIONS", body }); + wsInstance.emit("message", Buffer.from(`${payload.length}|${payload}`)); + + manager.close(); + + expect(wsInstance.close).toHaveBeenCalled(); + expect(manager.isConnected).toBe(false); + expect(manager.listenerCount("POSITIONS")).toBe(0); + }); + }); +}); From aa2a540b5ed6a4b290bbb9ff504aa4648c5f6049 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 12 Feb 2026 23:18:12 +0100 Subject: [PATCH 2/2] ci: Fixed broken linter pipeline --- src/client.ts | 5 +---- src/domains/position/position.ts | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/src/client.ts b/src/client.ts index 2e9e0ef..fa4a674 100644 --- a/src/client.ts +++ b/src/client.ts @@ -214,10 +214,7 @@ export class DxtradeClient { debug: config.debug ?? false, ensureSession() { if (!this.csrf) { - throw new DxtradeError( - ERROR.NO_SESSION, - "No active session. Call auth() or connect() first.", - ); + throw new DxtradeError(ERROR.NO_SESSION, "No active session. Call auth() or connect() first."); } }, throwError(code: string, message: string): never { diff --git a/src/domains/position/position.ts b/src/domains/position/position.ts index 460d7fa..cd4f2eb 100644 --- a/src/domains/position/position.ts +++ b/src/domains/position/position.ts @@ -4,10 +4,7 @@ import { Cookies, parseWsData, shouldLog, debugLog, retryRequest, authHeaders } import type { ClientContext } from "@/client.types"; import type { Position } from "."; -export function streamPositions( - ctx: ClientContext, - callback: (positions: Position.Get[]) => void, -): () => void { +export function streamPositions(ctx: ClientContext, callback: (positions: Position.Get[]) => void): () => void { if (!ctx.wsManager) { ctx.throwError( ERROR.STREAM_REQUIRES_CONNECT,