Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createRealTimeClient } from "./realtime/client";
import { createTokensClient } from "./tokens/client";
import { readEnv } from "./utils/env";
import { createInvalidApiKeyError, createInvalidBaseUrlError } from "./utils/errors";
import { type Logger, noopLogger } from "./utils/logger";

export type { ProcessClient } from "./process/client";
export type { FileInput, ProcessOptions, ReactNativeFile } from "./process/types";
Expand All @@ -23,13 +24,28 @@ export type {
RealTimeClientConnectOptions,
RealTimeClientInitialState,
} from "./realtime/client";
export type {
ConnectionPhase,
DiagnosticEvent,
DiagnosticEventName,
DiagnosticEvents,
IceCandidateEvent,
IceStateEvent,
PeerConnectionStateEvent,
PhaseTimingEvent,
ReconnectEvent,
SelectedCandidatePairEvent,
SignalingStateEvent,
VideoStallEvent,
} from "./realtime/diagnostics";
export type { SetInput } from "./realtime/methods";
export type {
RealTimeSubscribeClient,
SubscribeEvents,
SubscribeOptions,
} from "./realtime/subscribe-client";
export type { ConnectionState } from "./realtime/types";
export type { WebRTCStats } from "./realtime/webrtc-stats";
export {
type ImageModelDefinition,
type ImageModels,
Expand All @@ -46,6 +62,7 @@ export {
export type { ModelState } from "./shared/types";
export type { CreateTokenResponse, TokensClient } from "./tokens/client";
export { type DecartSDKError, ERROR_CODES } from "./utils/errors";
export { createConsoleLogger, type Logger, type LogLevel, noopLogger } from "./utils/logger";

// Schema with validation to ensure proxy and apiKey are mutually exclusive
// Proxy can be a full URL or a relative path (starts with /)
Expand Down Expand Up @@ -79,12 +96,16 @@ export type DecartClientOptions =
apiKey?: never;
baseUrl?: string;
integration?: string;
logger?: Logger;
telemetry?: boolean;
}
| {
proxy?: never;
apiKey?: string;
baseUrl?: string;
integration?: string;
logger?: Logger;
telemetry?: boolean;
};

/**
Expand Down Expand Up @@ -153,6 +174,8 @@ export const createDecartClient = (options: DecartClientOptions = {}) => {
baseUrl = parsedOptions.data.baseUrl || "https://api.decart.ai";
}
const { integration } = parsedOptions.data;
const logger = "logger" in options && options.logger ? options.logger : noopLogger;
const telemetryEnabled = "telemetry" in options && options.telemetry === false ? false : true;

// Realtime (WebRTC) always requires direct API access with API key
// Proxy mode is only for HTTP endpoints (process, queue, tokens)
Expand All @@ -162,6 +185,8 @@ export const createDecartClient = (options: DecartClientOptions = {}) => {
baseUrl: wsBaseUrl,
apiKey: apiKey || "",
integration,
logger,
telemetryEnabled,
});

const process = createProcessClient({
Expand Down
146 changes: 140 additions & 6 deletions packages/sdk/src/realtime/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { z } from "zod";
import { modelDefinitionSchema, type RealTimeModels } from "../shared/model";
import { modelStateSchema } from "../shared/types";
import { createWebrtcError, type DecartSDKError } from "../utils/errors";
import { classifyWebrtcError, type DecartSDKError } from "../utils/errors";
import type { Logger } from "../utils/logger";
import { AudioStreamManager } from "./audio-stream-manager";
import type { DiagnosticEvent } from "./diagnostics";
import { createEventBuffer } from "./event-buffer";
import { realtimeMethods, type SetInput } from "./methods";
import {
Expand All @@ -12,8 +14,10 @@ import {
type SubscribeEvents,
type SubscribeOptions,
} from "./subscribe-client";
import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter";
import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types";
import { WebRTCManager } from "./webrtc-manager";
import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats";

async function blobToBase64(blob: Blob): Promise<string> {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -69,6 +73,8 @@ export type RealTimeClientOptions = {
baseUrl: string;
apiKey: string;
integration?: string;
logger: Logger;
telemetryEnabled: boolean;
};

const realTimeClientInitialStateSchema = modelStateSchema;
Expand All @@ -94,6 +100,8 @@ export type Events = {
connectionChange: ConnectionState;
error: DecartSDKError;
generationTick: { seconds: number };
diagnostic: DiagnosticEvent;
stats: WebRTCStats;
};

export type RealTimeClient = {
Expand All @@ -114,7 +122,7 @@ export type RealTimeClient = {
};

export const createRealTimeClient = (opts: RealTimeClientOptions) => {
const { baseUrl, apiKey, integration } = opts;
const { baseUrl, apiKey, integration, logger } = opts;

const connect = async (
stream: MediaStream | null,
Expand Down Expand Up @@ -142,6 +150,8 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
}

let webrtcManager: WebRTCManager | undefined;
let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter();
let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null;

try {
// Prepare initial image base64 before connection
Expand All @@ -162,13 +172,19 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
webrtcManager = new WebRTCManager({
webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`,
integration,
logger,
onDiagnostic: (name, data) => {
emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]);
addTelemetryDiagnostic(name, data);
},
onRemoteStream,
onConnectionStateChange: (state) => {
emitOrBuffer("connectionChange", state);
handleConnectionStateChange?.(state);
},
onError: (error) => {
console.error("WebRTC error:", error);
emitOrBuffer("error", createWebrtcError(error));
logger.error("WebRTC error", { error: error.message });
emitOrBuffer("error", classifyWebrtcError(error));
},
customizeOffer: options.customizeOffer as ((offer: RTCSessionDescriptionInit) => Promise<void>) | undefined,
vp8MinBitrate: 300,
Expand All @@ -182,9 +198,56 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {

let sessionId: string | null = null;
let subscribeToken: string | null = null;
const pendingTelemetryDiagnostics: Array<{
name: DiagnosticEvent["name"];
data: DiagnosticEvent["data"];
timestamp: number;
}> = [];
let telemetryReporterReady = false;

const addTelemetryDiagnostic = (
name: DiagnosticEvent["name"],
data: DiagnosticEvent["data"],
timestamp: number = Date.now(),
): void => {
if (!opts.telemetryEnabled) {
return;
}

if (!telemetryReporterReady) {
pendingTelemetryDiagnostics.push({ name, data, timestamp });
return;
}

telemetryReporter.addDiagnostic({ name, data, timestamp });
};

const sessionIdListener = (msg: SessionIdMessage) => {
subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port);
sessionId = msg.session_id;

// Start telemetry reporter now that we have a session ID
if (opts.telemetryEnabled) {
if (telemetryReporterReady) {
telemetryReporter.stop();
}

const reporter = new TelemetryReporter({
apiKey,
sessionId: msg.session_id,
model: options.model.name,
integration,
logger,
});
reporter.start();
telemetryReporter = reporter;
telemetryReporterReady = true;

for (const diagnostic of pendingTelemetryDiagnostics) {
telemetryReporter.addDiagnostic(diagnostic);
}
pendingTelemetryDiagnostics.length = 0;
}
};
manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener);

Expand All @@ -197,12 +260,78 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {

const methods = realtimeMethods(manager, imageToBase64);

let statsCollector: WebRTCStatsCollector | null = null;
let statsCollectorPeerConnection: RTCPeerConnection | null = null;

// Video stall detection state (Twilio pattern: fps < 0.5 = stalled)
const STALL_FPS_THRESHOLD = 0.5;
let videoStalled = false;
let stallStartMs = 0;

const startStatsCollection = (): (() => void) => {
statsCollector?.stop();
videoStalled = false;
stallStartMs = 0;
statsCollector = new WebRTCStatsCollector();
const pc = manager.getPeerConnection();
statsCollectorPeerConnection = pc;
if (pc) {
statsCollector.start(pc, (stats) => {
emitOrBuffer("stats", stats);
telemetryReporter.addStats(stats);

// Stall detection: check if video fps dropped below threshold
const fps = stats.video?.framesPerSecond ?? 0;
if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) {
videoStalled = true;
stallStartMs = Date.now();
emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } });
addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs);
} else if (videoStalled && fps >= STALL_FPS_THRESHOLD) {
const durationMs = Date.now() - stallStartMs;
videoStalled = false;
emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } });
addTelemetryDiagnostic("videoStall", { stalled: false, durationMs });
}
});
}
return () => {
statsCollector?.stop();
statsCollector = null;
statsCollectorPeerConnection = null;
};
};

handleConnectionStateChange = (state) => {
if (!opts.telemetryEnabled) {
return;
}

if (state !== "connected" && state !== "generating") {
return;
}

const peerConnection = manager.getPeerConnection();
if (!peerConnection || peerConnection === statsCollectorPeerConnection) {
return;
}

startStatsCollection();
};

// Auto-start stats when telemetry is enabled
if (opts.telemetryEnabled) {
startStatsCollection();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats events never fire when telemetry is disabled

Medium Severity

The stats event is part of the public Events type, but startStatsCollection is only called when opts.telemetryEnabled is true. Both call sites — the auto-start at connection and the handleConnectionStateChange handler for reconnects — are gated behind telemetry checks. This means client.on("stats", ...) listeners never fire when telemetry is disabled, coupling local stats observation with remote telemetry reporting. A user who wants local WebRTC stats without sending data to Decart's servers has no way to get them.

Additional Locations (1)

Fix in Cursor Fix in Web


const client: RealTimeClient = {
set: methods.set,
setPrompt: methods.setPrompt,
isConnected: () => manager.isConnected(),
getConnectionState: () => manager.getConnectionState(),
disconnect: () => {
statsCollector?.stop();
telemetryReporter.stop();
stop();
manager.cleanup();
audioStreamManager?.cleanup();
Expand Down Expand Up @@ -236,6 +365,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
flush();
return client;
} catch (error) {
telemetryReporter.stop();
webrtcManager?.cleanup();
audioStreamManager?.cleanup();
throw error;
Expand All @@ -254,13 +384,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
webrtcManager = new WebRTCManager({
webrtcUrl: subscribeUrl,
integration,
logger,
onDiagnostic: (name, data) => {
emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]);
},
onRemoteStream: options.onRemoteStream,
onConnectionStateChange: (state) => {
emitOrBuffer("connectionChange", state);
},
onError: (error) => {
console.error("WebRTC subscribe error:", error);
emitOrBuffer("error", createWebrtcError(error));
logger.error("WebRTC subscribe error", { error: error.message });
emitOrBuffer("error", classifyWebrtcError(error));
},
});

Expand Down
Loading
Loading