Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ npm install dxtrade-api
- [x] Position metrics (per-position P&L)
- [x] Account metrics, trade journal & trade history
- [x] Symbol search & instrument info
- [x] OHLC / price bar data
- [x] OHLC / price bar data (one-shot & streaming)
- [x] PnL assessments
- [x] Multi-broker support (FTMO, Eightcap, Lark Funding)
- [x] Persistent WebSocket with `connect()`
- [x] Real-time position streaming
- [x] Full TypeScript support
- [ ] Batch orders
- [ ] Modify existing orders
- [ ] Real-time price streaming
- [x] Real-time OHLC streaming

## Quick Start

Expand All @@ -48,8 +48,8 @@ const client = new DxtradeClient({
accountId: "optional_account_id",
});

// connect() = auth + persistent WebSocket (recommended)
await client.connect();
// await client.auth(); // Auth only
await client.connect(); // Auth + persistent WebSocket (recommended)

const suggestions = await client.symbols.search("EURUSD");
const symbol = suggestions[0];
Expand All @@ -63,17 +63,17 @@ const order = await client.orders.submit({
});

console.log(`Order ${order.orderId}: ${order.status}`);
client.disconnect();
client.disconnect(); // disconnect stream -- only needed if client.connect()
```

## Connection Modes

```ts
// Persistent WebSocket (recommended) — reuses one WS for all data, enables streaming
// 1. Persistent WebSocket (recommended) — reuses one WS for all data, enables streaming
await client.connect();
client.disconnect(); // when done

// Lightweight — auth only, each data call opens a temporary WebSocket
// 2. Lightweight — auth only, each data call opens a temporary WebSocket
await client.auth();
```

Expand Down Expand Up @@ -113,11 +113,10 @@ BROKER.FTMO // "https://dxtrade.ftmo.com"

### Positions

- `client.positions.get()` — Get all open positions
- `client.positions.get()` — Get all open positions with P&L metrics merged (margin, plOpen, marketValue, etc.)
- `client.positions.close(params)` — Close a position (supports partial closes via the quantity field)
- `client.positions.closeAll()` — Close all open positions with market orders
- `client.positions.metrics()` — Get position-level P&L metrics
- `client.positions.stream(callback)` — Stream real-time position updates (requires `connect()`). Returns an unsubscribe function.
- `client.positions.stream(callback)` — Stream real-time position updates with live P&L (requires `connect()`). Returns an unsubscribe function.

### Orders

Expand Down Expand Up @@ -145,6 +144,7 @@ BROKER.FTMO // "https://dxtrade.ftmo.com"
### OHLC

- `client.ohlc.get(params)` — Fetch OHLC price bars for a symbol (resolution, range, maxBars, priceField)
- `client.ohlc.stream(params, callback)` — Stream real-time OHLC bar updates (requires `connect()`). Returns a promise that resolves with an unsubscribe function after the snapshot is received.

### Assessments

Expand Down Expand Up @@ -196,6 +196,8 @@ npm run example:symbols:info:btc
npm run example:instruments:get
npm run example:instruments:get:forex
npm run example:ohlc:get
npm run example:ohlc:stream
npm run example:ohlc:stream:btc
npm run example:assessments:get
npm run example:assessments:get:btc
```
Expand Down
28 changes: 28 additions & 0 deletions examples/ohlc.stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import "dotenv/config";
import { DxtradeClient, BROKER } from "../src";

const client = new DxtradeClient({
username: process.env.DXTRADE_USERNAME!,
password: process.env.DXTRADE_PASSWORD!,
broker: process.env.DXTRADE_BROKER! || BROKER.FTMO,
accountId: process.env.DXTRADE_ACCOUNT_ID,
debug: process.env.DXTRADE_DEBUG || false,
});

const symbol = process.argv[2] ?? "EURUSD";

(async () => {
await client.connect();
console.log(`Streaming OHLC for ${symbol}...\n`);

const unsubscribe = await client.ohlc.stream({ symbol }, (bars) => {
console.log(`${bars.length} bars:`, bars.slice(-3));
});

// Stream for 60 seconds then clean up
setTimeout(() => {
console.log("Unsubscribing and disconnecting...");
unsubscribe();
client.disconnect();
}, 60_000);
})().catch(console.error);
17 changes: 0 additions & 17 deletions examples/positions.metrics.ts

This file was deleted.

6 changes: 3 additions & 3 deletions llms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ await client.connect(); // auth + persistent WebSocket (recommended)
- client.switchAccount(accountId: string) — Switch to a specific trading account

### Positions
- client.positions.get() — Get all open positions, returns Position.Get[]
- client.positions.get() — Get all open positions with P&L metrics merged (margin, plOpen, marketValue, etc.), returns Position.Full[]
- client.positions.close(params: Position.Close) — Close a position (supports partial closes via the quantity field)
- client.positions.closeAll() — Close all open positions with market orders
- client.positions.metrics() — Get position-level P&L metrics, returns Position.Metrics[]
- client.positions.stream(callback: (positions: Position.Get[]) => void) — Stream real-time position updates, requires connect(). Returns unsubscribe function.
- client.positions.stream(callback: (positions: Position.Full[]) => void) — Stream real-time position updates with live P&L (requires connect()). Returns unsubscribe function.

### Orders
- client.orders.get() — Get all pending/open orders, returns Order.Get[]
Expand All @@ -63,6 +62,7 @@ await client.connect(); // auth + persistent WebSocket (recommended)
- client.ohlc.get(params: OHLC.Params) — Fetch OHLC price bars for a symbol, returns OHLC.Bar[]
Required params: symbol (string)
Optional params: resolution (seconds, default 300), range (seconds, default 345600), maxBars (default 3500), priceField ("bid" | "ask", default "bid")
- client.ohlc.stream(params: OHLC.Params, callback: (bars: OHLC.Bar[]) => void) — Stream real-time OHLC bar updates (requires connect()). Returns Promise<() => void>. Callback receives snapshot bars first, then live updates.

### Assessments
- client.assessments.get(params: Assessments.Params) — Fetch PnL assessments for a date range
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"example:positions:get": "tsx examples/positions.get.ts",
"example:positions:close": "tsx examples/positions.close.ts",
"example:positions:close-all": "tsx examples/positions.close-all.ts",
"example:positions:metrics": "tsx examples/positions.metrics.ts",
"example:positions:stream": "tsx examples/positions.stream.ts",
"example:orders:submit": "tsx examples/orders.submit.ts",
"example:account:metrics": "tsx examples/account.metrics.ts",
Expand All @@ -39,6 +38,8 @@
"example:instruments:get": "tsx examples/instruments.get.ts",
"example:instruments:get:forex": "tsx examples/instruments.get.ts FOREX",
"example:ohlc:get": "tsx examples/ohlc.get.ts",
"example:ohlc:stream": "tsx examples/ohlc.stream.ts",
"example:ohlc:stream:btc": "tsx examples/ohlc.stream.ts BTCUSD",
"example:assessments:get": "tsx examples/assessments.get.ts",
"example:assessments:get:btc": "tsx examples/assessments.get.ts BTCUSD",
"============= Git =============": "",
Expand Down
22 changes: 11 additions & 11 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
getAccountMetrics,
getTradeHistory,
getPositions,
getPositionMetrics,
closePosition,
closeAllPositions,
streamPositions,
Expand All @@ -20,6 +19,7 @@
getSymbolLimits,
getSymbolSuggestions,
getOHLC,
streamOHLC,
getSymbolInfo,
getOrders,
cancelOrder,
Expand All @@ -31,8 +31,8 @@
class PositionsDomain {
constructor(private _ctx: ClientContext) {}

/** Get all open positions via WebSocket. */
get(): Promise<Position.Get[]> {
/** Get all open positions with P&L metrics merged. */
get(): Promise<Position.Full[]> {
return getPositions(this._ctx);
}

Expand All @@ -46,13 +46,8 @@
return closeAllPositions(this._ctx);
}

/** Get position-level P&L metrics via WebSocket. */
metrics(): Promise<Position.Metrics[]> {
return getPositionMetrics(this._ctx);
}

/** Stream real-time position updates. Requires connect(). Returns unsubscribe function. */
stream(callback: (positions: Position.Get[]) => void): () => void {
/** Stream real-time position updates with P&L metrics. Requires connect(). Returns unsubscribe function. */
stream(callback: (positions: Position.Full[]) => void): () => void {
return streamPositions(this._ctx, callback);
}
}
Expand Down Expand Up @@ -97,7 +92,7 @@
* @param params.from - Start timestamp (Unix ms)
* @param params.to - End timestamp (Unix ms)
*/
tradeJournal(params: { from: number; to: number }): Promise<any> {

Check warning on line 95 in src/client.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
return getTradeJournal(this._ctx, params);
}

Expand Down Expand Up @@ -153,6 +148,11 @@
get(params: OHLC.Params): Promise<OHLC.Bar[]> {
return getOHLC(this._ctx, params);
}

/** Stream real-time OHLC bar updates. Requires connect(). Returns unsubscribe function. */
stream(params: OHLC.Params, callback: (bars: OHLC.Bar[]) => void): Promise<() => void> {
return streamOHLC(this._ctx, params, callback);
}
}

class AssessmentsDomain {
Expand Down Expand Up @@ -193,7 +193,7 @@
public readonly symbols: SymbolsDomain;
/** Instrument operations: get (with optional filtering). */
public readonly instruments: InstrumentsDomain;
/** OHLC price bar operations: get. */
/** OHLC price bar operations: get, stream. */
public readonly ohlc: OhlcDomain;
/** PnL assessment operations: get. */
public readonly assessments: AssessmentsDomain;
Expand Down
1 change: 1 addition & 0 deletions src/constants/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,6 @@ export enum WS_MESSAGE {
export namespace WS_MESSAGE {
export enum SUBTOPIC {
BIG_CHART_COMPONENT = "BigChartComponentPresenter-4",
OHLC_STREAM = "OHLCStreamPresenter-0",
}
}
103 changes: 103 additions & 0 deletions src/domains/ohlc/ohlc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,109 @@ import { Cookies, authHeaders, retryRequest, parseWsData, shouldLog, debugLog }
import type { ClientContext } from "@/client.types";
import type { OHLC } from ".";

export async function streamOHLC(
ctx: ClientContext,
params: OHLC.Params,
callback: (bars: OHLC.Bar[]) => void,
): Promise<() => void> {
if (!ctx.wsManager) {
ctx.throwError(
ERROR.STREAM_REQUIRES_CONNECT,
"Streaming requires a persistent WebSocket. Use connect() instead of auth().",
);
}

const { symbol, resolution = 60, range = 432_000, maxBars = 3500, priceField = "bid" } = params;
const subtopic = WS_MESSAGE.SUBTOPIC.OHLC_STREAM;
const headers = authHeaders(ctx.csrf!, Cookies.serialize(ctx.cookies));
const snapshotBars: OHLC.Bar[] = [];
let snapshotDone = false;
let resolveSnapshot: (() => void) | null = null;

const onChartFeed = (body: Record<string, unknown>) => {
if (body?.subtopic !== subtopic) return;
const data = body.data as OHLC.Bar[] | undefined;
if (!Array.isArray(data)) return;

if (!snapshotDone) {
snapshotBars.push(...data);
if (body.snapshotEnd) {
snapshotDone = true;
callback([...snapshotBars]);
resolveSnapshot?.();
}
} else {
callback(data);
}
};

ctx.wsManager.on(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed);

try {
await retryRequest(
{
method: "PUT",
url: endpoints.subscribeInstruments(ctx.broker),
data: { instruments: [symbol] },
headers,
},
ctx.retries,
);
await retryRequest(
{
method: "PUT",
url: endpoints.charts(ctx.broker),
data: {
chartIds: [],
requests: [
{
aggregationPeriodSeconds: resolution,
extendedSession: true,
forexPriceField: priceField,
id: 0,
maxBarsCount: maxBars,
range,
studySubscription: [],
subtopic,
symbol,
},
],
},
headers,
},
ctx.retries,
);
} catch (error: unknown) {
ctx.wsManager.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed);
const message = error instanceof Error ? error.message : "Unknown error";
ctx.throwError(ERROR.OHLC_ERROR, `OHLC stream subscription error: ${message}`);
}

await new Promise<void>((resolve, reject) => {
if (snapshotDone) return resolve();

const timer = setTimeout(() => {
if (snapshotBars.length > 0) {
snapshotDone = true;
callback([...snapshotBars]);
resolve();
} else {
ctx.wsManager?.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed);
reject(new DxtradeError(ERROR.OHLC_TIMEOUT, "OHLC stream snapshot timed out"));
}
}, 30_000);

resolveSnapshot = () => {
clearTimeout(timer);
resolve();
};
});

return () => {
ctx.wsManager?.removeListener(WS_MESSAGE.CHART_FEED_SUBTOPIC, onChartFeed);
};
}

export async function getOHLC(ctx: ClientContext, params: OHLC.Params, timeout = 30_000): Promise<OHLC.Bar[]> {
ctx.ensureSession();

Expand Down
Loading
Loading