From 82d4e86b5ba526338183ef2b2205aa7e7bc7dc93 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:13:58 +0200 Subject: [PATCH 01/10] feat(realtime): add WebRTC observability - diagnostics, stats, telemetry Add comprehensive WebRTC observability to the realtime client: - Structured logger with configurable log levels (debug/info/warn/error) - Diagnostic event system for connection lifecycle (ICE, signaling, phase timing, reconnects, video stalls) - WebRTC stats collector polling at 1s intervals with delta computation for cumulative counters - Telemetry reporter that batches stats + diagnostics and sends to backend every 10s - NullReporter pattern to eliminate conditional checks when telemetry is disabled - Granular error codes (WEBRTC_NEGOTIATION_FAILED, ICE_CONNECTION_FAILED, etc.) - Explicit tags (session_id, sdk_version, integration) in telemetry reports for Datadog tagging - Quality limitation tracking from outbound-rtp stats - Video stall detection (fps < 0.5 threshold, Twilio pattern) - 121 unit tests passing Co-Authored-By: Claude Opus 4.6 --- package-lock.json | 6 + packages/sdk/src/index.ts | 28 + packages/sdk/src/realtime/client.ts | 94 +- packages/sdk/src/realtime/diagnostics.ts | 87 ++ packages/sdk/src/realtime/subscribe-client.ts | 2 + .../sdk/src/realtime/telemetry-reporter.ts | 137 +++ .../sdk/src/realtime/webrtc-connection.ts | 172 +++- packages/sdk/src/realtime/webrtc-manager.ts | 34 +- packages/sdk/src/realtime/webrtc-stats.ts | 233 +++++ packages/sdk/src/utils/errors.ts | 50 +- packages/sdk/src/utils/logger.ts | 44 + packages/sdk/tests/unit.test.ts | 926 ++++++++++++++++++ 12 files changed, 1789 insertions(+), 24 deletions(-) create mode 100644 package-lock.json create mode 100644 packages/sdk/src/realtime/diagnostics.ts create mode 100644 packages/sdk/src/realtime/telemetry-reporter.ts create mode 100644 packages/sdk/src/realtime/webrtc-stats.ts create mode 100644 packages/sdk/src/utils/logger.ts diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..4a6eaa6 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "sun-valley", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 0a86932..23369ed 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -5,6 +5,7 @@ import { createRealTimeClient } from "./realtime/client"; import { createTokensClient } from "./tokens/client"; import { readEnv } from "./utils/env"; import { createInvalidApiKeyError, createInvalidBaseUrlError } from "./utils/errors"; +import { type Logger, noopLogger } from "./utils/logger"; export type { ProcessClient } from "./process/client"; export type { FileInput, ProcessOptions, ReactNativeFile } from "./process/types"; @@ -24,6 +25,20 @@ export type { RealTimeClientConnectOptions, RealTimeClientInitialState, } from "./realtime/client"; +export type { + ConnectionPhase, + DiagnosticEvent, + DiagnosticEventName, + DiagnosticEvents, + IceCandidateEvent, + IceStateEvent, + PeerConnectionStateEvent, + PhaseTimingEvent, + ReconnectEvent, + SelectedCandidatePairEvent, + SignalingStateEvent, + VideoStallEvent, +} from "./realtime/diagnostics"; export type { SetInput } from "./realtime/methods"; export type { RealTimeSubscribeClient, @@ -31,6 +46,7 @@ export type { SubscribeOptions, } from "./realtime/subscribe-client"; export type { ConnectionState } from "./realtime/types"; +export type { StatsOptions, WebRTCStats } from "./realtime/webrtc-stats"; export { type ImageModelDefinition, type ImageModels, @@ -47,6 +63,7 @@ export { export type { ModelState } from "./shared/types"; export type { CreateTokenResponse, TokensClient } from "./tokens/client"; export { type DecartSDKError, ERROR_CODES } from "./utils/errors"; +export { createConsoleLogger, type Logger, type LogLevel, noopLogger } from "./utils/logger"; // Schema with validation to ensure proxy and apiKey are mutually exclusive // Proxy can be a full URL or a relative path (starts with /) @@ -80,12 +97,16 @@ export type DecartClientOptions = apiKey?: never; baseUrl?: string; integration?: string; + logger?: Logger; + telemetry?: boolean; } | { proxy?: never; apiKey?: string; baseUrl?: string; integration?: string; + logger?: Logger; + telemetry?: boolean; }; /** @@ -154,15 +175,22 @@ export const createDecartClient = (options: DecartClientOptions = {}) => { baseUrl = parsedOptions.data.baseUrl || "https://api.decart.ai"; } const { integration } = parsedOptions.data; + const logger = "logger" in options && options.logger ? options.logger : noopLogger; + const telemetryEnabled = "telemetry" in options && options.telemetry === false ? false : true; // Realtime (WebRTC) always requires direct API access with API key // Proxy mode is only for HTTP endpoints (process, queue, tokens) // Note: Realtime will fail at connection time if no API key is provided const wsBaseUrl = "wss://api3.decart.ai"; + const httpBaseUrl = parsedOptions.data.baseUrl || "https://api.decart.ai"; const realtime = createRealTimeClient({ baseUrl: wsBaseUrl, + telemetryUrl: "http://localhost:3003/api", + // telemetryUrl: httpBaseUrl, apiKey: apiKey || "", integration, + logger, + telemetryEnabled, }); const process = createProcessClient({ diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 017239c..fcf94f0 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -1,7 +1,11 @@ import { z } from "zod"; import { modelDefinitionSchema } from "../shared/model"; import { modelStateSchema } from "../shared/types"; -import { createWebrtcError, type DecartSDKError } from "../utils/errors"; +import { classifyWebrtcError, type DecartSDKError } from "../utils/errors"; +import type { Logger } from "../utils/logger"; +import type { DiagnosticEvent } from "./diagnostics"; +import { WebRTCStatsCollector, type StatsOptions, type WebRTCStats } from "./webrtc-stats"; +import { TelemetryReporter, NullTelemetryReporter, type ITelemetryReporter } from "./telemetry-reporter"; import { AudioStreamManager } from "./audio-stream-manager"; import { createEventBuffer } from "./event-buffer"; import { realtimeMethods, type SetInput } from "./methods"; @@ -67,8 +71,11 @@ async function imageToBase64(image: Blob | File | string): Promise { export type RealTimeClientOptions = { baseUrl: string; + telemetryUrl: string; apiKey: string; integration?: string; + logger: Logger; + telemetryEnabled: boolean; }; const realTimeClientInitialStateSchema = modelStateSchema; @@ -100,6 +107,8 @@ export type Events = { connectionChange: ConnectionState; error: DecartSDKError; generationTick: { seconds: number }; + diagnostic: DiagnosticEvent; + stats: WebRTCStats; }; export type RealTimeClient = { @@ -117,10 +126,12 @@ export type RealTimeClient = { options?: { prompt?: string; enhance?: boolean; timeout?: number }, ) => Promise; playAudio?: (audio: Blob | File | ArrayBuffer) => Promise; + /** Start collecting WebRTC stats. Stats are emitted via the 'stats' event. Returns a stop function. */ + startStats: (options?: StatsOptions) => () => void; }; export const createRealTimeClient = (opts: RealTimeClientOptions) => { - const { baseUrl, apiKey, integration } = opts; + const { baseUrl, apiKey, integration, logger } = opts; const connect = async ( stream: MediaStream | null, @@ -178,13 +189,18 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { webrtcManager = new WebRTCManager({ webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`, integration, + logger, + onDiagnostic: (name, data) => { + emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]); + telemetryReporter.addDiagnostic({ name, data, timestamp: Date.now() }); + }, onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); }, onError: (error) => { - console.error("WebRTC error:", error); - emitOrBuffer("error", createWebrtcError(error)); + logger.error("WebRTC error", { error: error.message }); + emitOrBuffer("error", classifyWebrtcError(error)); }, customizeOffer: options.customizeOffer as ((offer: RTCSessionDescriptionInit) => Promise) | undefined, vp8MinBitrate: 300, @@ -198,9 +214,24 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let sessionId: string | null = null; let subscribeToken: string | null = null; + let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); + const sessionIdListener = (msg: SessionIdMessage) => { subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port); sessionId = msg.session_id; + + // Start telemetry reporter now that we have a session ID + if (opts.telemetryEnabled) { + const reporter = new TelemetryReporter({ + telemetryUrl: opts.telemetryUrl, + apiKey, + sessionId: msg.session_id, + integration, + logger, + }); + reporter.start(); + telemetryReporter = reporter; + } }; manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener); @@ -219,12 +250,58 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { await methods.setPrompt(text, { enhance }); } + let statsCollector: WebRTCStatsCollector | null = null; + + // Video stall detection state (Twilio pattern: fps < 0.5 = stalled) + const STALL_FPS_THRESHOLD = 0.5; + let videoStalled = false; + let stallStartMs = 0; + + const startStatsCollection = (statsOptions?: StatsOptions): (() => void) => { + statsCollector?.stop(); + videoStalled = false; + stallStartMs = 0; + statsCollector = new WebRTCStatsCollector(statsOptions); + const pc = manager.getPeerConnection(); + if (pc) { + statsCollector.start(pc, (stats) => { + emitOrBuffer("stats", stats); + telemetryReporter.addStats(stats); + + // Stall detection: check if video fps dropped below threshold + const fps = stats.video?.framesPerSecond ?? 0; + if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) { + videoStalled = true; + stallStartMs = Date.now(); + emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); + telemetryReporter.addDiagnostic({ name: "videoStall", data: { stalled: true, durationMs: 0 }, timestamp: stallStartMs }); + } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { + const durationMs = Date.now() - stallStartMs; + videoStalled = false; + emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); + telemetryReporter.addDiagnostic({ name: "videoStall", data: { stalled: false, durationMs }, timestamp: Date.now() }); + } + }); + } + return () => { + statsCollector?.stop(); + statsCollector = null; + }; + }; + + // Auto-start stats when telemetry is enabled + if (opts.telemetryEnabled) { + startStatsCollection(); + } + const client: RealTimeClient = { set: methods.set, setPrompt: methods.setPrompt, isConnected: () => manager.isConnected(), getConnectionState: () => manager.getConnectionState(), disconnect: () => { + statsCollector?.stop(); + telemetryReporter.stop(); stop(); manager.cleanup(); audioStreamManager?.cleanup(); @@ -247,6 +324,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const base64 = await imageToBase64(image); return manager.setImage(base64, options); }, + startStats: startStatsCollection, }; // Add live_avatar specific audio method (only when using internal AudioStreamManager) @@ -276,13 +354,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { webrtcManager = new WebRTCManager({ webrtcUrl: subscribeUrl, integration, + logger, + onDiagnostic: (name, data) => { + emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]); + }, onRemoteStream: options.onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); }, onError: (error) => { - console.error("WebRTC subscribe error:", error); - emitOrBuffer("error", createWebrtcError(error)); + logger.error("WebRTC subscribe error", { error: error.message }); + emitOrBuffer("error", classifyWebrtcError(error)); }, }); diff --git a/packages/sdk/src/realtime/diagnostics.ts b/packages/sdk/src/realtime/diagnostics.ts new file mode 100644 index 0000000..69059d9 --- /dev/null +++ b/packages/sdk/src/realtime/diagnostics.ts @@ -0,0 +1,87 @@ +/** Connection phase names for timing events. */ +export type ConnectionPhase = "websocket" | "avatar-image" | "initial-prompt" | "webrtc-handshake" | "total"; + +export type PhaseTimingEvent = { + phase: ConnectionPhase; + durationMs: number; + success: boolean; + error?: string; +}; + +export type IceCandidateEvent = { + source: "local" | "remote"; + candidateType: "host" | "srflx" | "prflx" | "relay" | "unknown"; + protocol: "udp" | "tcp" | "unknown"; + address?: string; + port?: number; +}; + +export type IceStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type PeerConnectionStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type SignalingStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type SelectedCandidatePairEvent = { + local: { + candidateType: string; + protocol: string; + address?: string; + port?: number; + }; + remote: { + candidateType: string; + protocol: string; + address?: string; + port?: number; + }; +}; + +export type ReconnectEvent = { + attempt: number; + maxAttempts: number; + durationMs: number; + success: boolean; + error?: string; +}; + +export type VideoStallEvent = { + /** True when a stall is detected, false when recovered. */ + stalled: boolean; + /** Duration of the stall in ms (0 when stall first detected, actual duration on recovery). */ + durationMs: number; +}; + +/** All diagnostic event types keyed by name. */ +export type DiagnosticEvents = { + phaseTiming: PhaseTimingEvent; + iceCandidate: IceCandidateEvent; + iceStateChange: IceStateEvent; + peerConnectionStateChange: PeerConnectionStateEvent; + signalingStateChange: SignalingStateEvent; + selectedCandidatePair: SelectedCandidatePairEvent; + reconnect: ReconnectEvent; + videoStall: VideoStallEvent; +}; + +export type DiagnosticEventName = keyof DiagnosticEvents; + +/** A single diagnostic event with its name and typed data. */ +export type DiagnosticEvent = { + [K in DiagnosticEventName]: { name: K; data: DiagnosticEvents[K] }; +}[DiagnosticEventName]; + +/** Callback for emitting diagnostic events from the connection/manager layers. */ +export type DiagnosticEmitter = (name: K, data: DiagnosticEvents[K]) => void; diff --git a/packages/sdk/src/realtime/subscribe-client.ts b/packages/sdk/src/realtime/subscribe-client.ts index 329871a..6b1370f 100644 --- a/packages/sdk/src/realtime/subscribe-client.ts +++ b/packages/sdk/src/realtime/subscribe-client.ts @@ -1,4 +1,5 @@ import type { DecartSDKError } from "../utils/errors"; +import type { DiagnosticEvent } from "./diagnostics"; import type { ConnectionState } from "./types"; type TokenPayload = { @@ -26,6 +27,7 @@ export function decodeSubscribeToken(token: string): TokenPayload { export type SubscribeEvents = { connectionChange: ConnectionState; error: DecartSDKError; + diagnostic: DiagnosticEvent; }; export type RealTimeSubscribeClient = { diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/telemetry-reporter.ts new file mode 100644 index 0000000..4545ff0 --- /dev/null +++ b/packages/sdk/src/realtime/telemetry-reporter.ts @@ -0,0 +1,137 @@ +import { buildAuthHeaders } from "../shared/request"; +import type { Logger } from "../utils/logger"; +import { VERSION } from "../version"; +import type { WebRTCStats } from "./webrtc-stats"; + +const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds + +type TelemetryDiagnostic = { + name: string; + data: unknown; + timestamp: number; +}; + +type TelemetryReport = { + sessionId: string; + timestamp: number; + sdkVersion: string; + /** Tags that the backend should attach to every Datadog metric/log from this report. */ + tags: Record; + stats: WebRTCStats[]; + diagnostics: TelemetryDiagnostic[]; +}; + +export interface TelemetryReporterOptions { + telemetryUrl: string; + apiKey: string; + sessionId: string; + integration?: string; + logger: Logger; + reportIntervalMs?: number; +} + +/** Interface for telemetry reporting. Allows substituting a no-op implementation. */ +export interface ITelemetryReporter { + start(): void; + addStats(stats: WebRTCStats): void; + addDiagnostic(event: TelemetryDiagnostic): void; + flush(): void; + stop(): void; +} + +/** No-op reporter that silently discards all data. Used when telemetry is disabled. */ +export class NullTelemetryReporter implements ITelemetryReporter { + start(): void {} + addStats(): void {} + addDiagnostic(): void {} + flush(): void {} + stop(): void {} +} + +export class TelemetryReporter implements ITelemetryReporter { + private telemetryUrl: string; + private apiKey: string; + private sessionId: string; + private integration?: string; + private logger: Logger; + private reportIntervalMs: number; + private intervalId: ReturnType | null = null; + private statsBuffer: WebRTCStats[] = []; + private diagnosticsBuffer: TelemetryDiagnostic[] = []; + + constructor(options: TelemetryReporterOptions) { + this.telemetryUrl = options.telemetryUrl; + this.apiKey = options.apiKey; + this.sessionId = options.sessionId; + this.integration = options.integration; + this.logger = options.logger; + this.reportIntervalMs = options.reportIntervalMs ?? DEFAULT_REPORT_INTERVAL_MS; + } + + /** Start the periodic reporting timer. */ + start(): void { + if (this.intervalId !== null) return; + this.intervalId = setInterval(() => this.flush(), this.reportIntervalMs); + } + + /** Add a stats snapshot to the buffer. */ + addStats(stats: WebRTCStats): void { + this.statsBuffer.push(stats); + } + + /** Add a diagnostic event to the buffer. */ + addDiagnostic(event: TelemetryDiagnostic): void { + this.diagnosticsBuffer.push(event); + } + + /** Flush buffered data immediately. */ + flush(): void { + this.sendReport(false); + } + + /** Stop the reporter and send a final report with keepalive. */ + stop(): void { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this.sendReport(true); + } + + private sendReport(keepalive: boolean): void { + if (this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0) { + return; + } + + const report: TelemetryReport = { + sessionId: this.sessionId, + timestamp: Date.now(), + sdkVersion: VERSION, + tags: { + session_id: this.sessionId, + sdk_version: VERSION, + ...(this.integration ? { integration: this.integration } : {}), + }, + stats: this.statsBuffer.splice(0), + diagnostics: this.diagnosticsBuffer.splice(0), + }; + + try { + const headers = buildAuthHeaders({ apiKey: this.apiKey, integration: this.integration }); + + fetch(`${this.telemetryUrl}/v1/telemetry`, { + method: "POST", + headers: { + ...headers, + "Content-Type": "application/json", + }, + body: JSON.stringify(report), + keepalive, + }).catch((error) => { + this.logger.debug("Telemetry report failed", { error: String(error) }); + }); + } catch (error) { + this.logger.debug("Telemetry report failed", { error: String(error) }); + } + } +} diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index a412615..765882c 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -1,4 +1,6 @@ import mitt from "mitt"; +import type { DiagnosticEmitter, IceCandidateEvent } from "./diagnostics"; +import type { Logger } from "../utils/logger"; import { buildUserAgent } from "../utils/user-agent"; import type { ConnectionState, @@ -24,6 +26,8 @@ interface ConnectionCallbacks { isAvatarLive?: boolean; avatarImageBase64?: string; initialPrompt?: { text: string; enhance?: boolean }; + logger?: Logger; + onDiagnostic?: DiagnosticEmitter; } type WsMessageEvents = { @@ -33,14 +37,25 @@ type WsMessageEvents = { generationTick: GenerationTickMessage; }; +const noopDiagnostic: DiagnosticEmitter = () => {}; + export class WebRTCConnection { private pc: RTCPeerConnection | null = null; private ws: WebSocket | null = null; private localStream: MediaStream | null = null; private connectionReject: ((error: Error) => void) | null = null; + private logger: Logger; + private emitDiagnostic: DiagnosticEmitter; state: ConnectionState = "disconnected"; websocketMessagesEmitter = mitt(); - constructor(private callbacks: ConnectionCallbacks = {}) {} + constructor(private callbacks: ConnectionCallbacks = {}) { + this.logger = callbacks.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; + this.emitDiagnostic = callbacks.onDiagnostic ?? noopDiagnostic; + } + + getPeerConnection(): RTCPeerConnection | null { + return this.pc; + } async connect(url: string, localStream: MediaStream | null, timeout: number, integration?: string): Promise { const deadline = Date.now() + timeout; @@ -62,8 +77,10 @@ export class WebRTCConnection { connectAbort.catch(() => {}); this.connectionReject = (error) => rejectConnect(error); + const totalStart = performance.now(); try { // Phase 1: WebSocket setup + const wsStart = performance.now(); await Promise.race([ new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error("WebSocket timeout")), timeout); @@ -71,18 +88,29 @@ export class WebRTCConnection { this.ws.onopen = () => { clearTimeout(timer); + this.emitDiagnostic("phaseTiming", { + phase: "websocket", + durationMs: performance.now() - wsStart, + success: true, + }); resolve(); }; this.ws.onmessage = (e) => { try { this.handleSignalingMessage(JSON.parse(e.data)); } catch (err) { - console.error("[WebRTC] Parse error:", err); + this.logger.error("Signaling message parse error", { error: String(err) }); } }; this.ws.onerror = () => { clearTimeout(timer); const error = new Error("WebSocket error"); + this.emitDiagnostic("phaseTiming", { + phase: "websocket", + durationMs: performance.now() - wsStart, + success: false, + error: error.message, + }); reject(error); rejectConnect(error); }; @@ -99,25 +127,55 @@ export class WebRTCConnection { // Phase 2: Pre-handshake setup (avatar image + initial prompt) // connectionReject is already active, so ws.onclose or server errors abort these too if (this.callbacks.avatarImageBase64) { + const avatarStart = performance.now(); await Promise.race([this.sendAvatarImage(this.callbacks.avatarImageBase64), connectAbort]); + this.emitDiagnostic("phaseTiming", { + phase: "avatar-image", + durationMs: performance.now() - avatarStart, + success: true, + }); } if (this.callbacks.initialPrompt) { + const promptStart = performance.now(); await Promise.race([this.sendInitialPrompt(this.callbacks.initialPrompt), connectAbort]); + this.emitDiagnostic("phaseTiming", { + phase: "initial-prompt", + durationMs: performance.now() - promptStart, + success: true, + }); } // Phase 3: WebRTC handshake + const handshakeStart = performance.now(); await this.setupNewPeerConnection(); await Promise.race([ new Promise((resolve, reject) => { const checkConnection = setInterval(() => { if (this.state === "connected" || this.state === "generating") { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: true, + }); resolve(); } else if (this.state === "disconnected") { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: false, + error: "Connection lost during handshake", + }); reject(new Error("Connection lost during WebRTC handshake")); } else if (Date.now() >= deadline) { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: false, + error: "Timeout", + }); reject(new Error("Connection timeout")); } }, 100); @@ -126,6 +184,12 @@ export class WebRTCConnection { }), connectAbort, ]); + + this.emitDiagnostic("phaseTiming", { + phase: "total", + durationMs: performance.now() - totalStart, + success: true, + }); } finally { this.connectionReject = null; } @@ -202,7 +266,14 @@ export class WebRTCConnection { }); break; case "ice-candidate": - if (msg.candidate) await this.pc.addIceCandidate(msg.candidate); + if (msg.candidate) { + await this.pc.addIceCandidate(msg.candidate); + this.emitDiagnostic("iceCandidate", { + source: "remote", + candidateType: (msg.candidate.candidate?.match(/typ (\w+)/)?.[1] as IceCandidateEvent["candidateType"]) ?? "unknown", + protocol: (msg.candidate.candidate?.match(/udp|tcp/i)?.[0]?.toLowerCase() as IceCandidateEvent["protocol"]) ?? "unknown", + }); + } break; case "ice-restart": { const turnConfig = msg.turn_config; @@ -213,7 +284,7 @@ export class WebRTCConnection { } } } catch (error) { - console.error("[WebRTC] Error:", error); + this.logger.error("Signaling handler error", { error: String(error) }); this.callbacks.onError?.(error as Error); this.connectionReject?.(error as Error); } @@ -224,7 +295,7 @@ export class WebRTCConnection { this.ws.send(JSON.stringify(message)); return true; } - console.warn("[WebRTC] Message dropped: WebSocket is not open"); + this.logger.warn("Message dropped: WebSocket is not open"); return false; } @@ -369,11 +440,32 @@ export class WebRTCConnection { this.pc.onicecandidate = (e) => { this.send({ type: "ice-candidate", candidate: e.candidate }); + if (e.candidate) { + this.emitDiagnostic("iceCandidate", { + source: "local", + candidateType: (e.candidate.type as IceCandidateEvent["candidateType"]) ?? "unknown", + protocol: (e.candidate.protocol as IceCandidateEvent["protocol"]) ?? "unknown", + address: e.candidate.address ?? undefined, + port: e.candidate.port ?? undefined, + }); + } }; + let prevPcState: string = "new"; this.pc.onconnectionstatechange = () => { if (!this.pc) return; const s = this.pc.connectionState; + this.emitDiagnostic("peerConnectionStateChange", { + state: s, + previousState: prevPcState, + timestampMs: performance.now(), + }); + prevPcState = s; + + if (s === "connected") { + this.emitSelectedCandidatePair(); + } + const nextState = s === "connected" ? "connected" : ["connecting", "new"].includes(s) ? "connecting" : "disconnected"; // Keep "generating" sticky unless the connection is actually lost. @@ -381,16 +473,74 @@ export class WebRTCConnection { this.setState(nextState); }; + let prevIceState: string = "new"; this.pc.oniceconnectionstatechange = () => { - if (this.pc?.iceConnectionState === "failed") { + if (!this.pc) return; + const newIceState = this.pc.iceConnectionState; + this.emitDiagnostic("iceStateChange", { + state: newIceState, + previousState: prevIceState, + timestampMs: performance.now(), + }); + prevIceState = newIceState; + + if (newIceState === "failed") { this.setState("disconnected"); this.callbacks.onError?.(new Error("ICE connection failed")); } }; + let prevSignalingState: string = "stable"; + this.pc.onsignalingstatechange = () => { + if (!this.pc) return; + const newState = this.pc.signalingState; + this.emitDiagnostic("signalingStateChange", { + state: newState, + previousState: prevSignalingState, + timestampMs: performance.now(), + }); + prevSignalingState = newState; + }; + this.handleSignalingMessage({ type: "ready" }); } + private async emitSelectedCandidatePair(): Promise { + if (!this.pc) return; + try { + const stats = await this.pc.getStats(); + for (const report of stats.values()) { + if (report.type === "candidate-pair" && report.state === "succeeded") { + let localCandidate: Record | undefined; + let remoteCandidate: Record | undefined; + for (const r of stats.values()) { + if (r.id === report.localCandidateId) localCandidate = r as Record; + if (r.id === report.remoteCandidateId) remoteCandidate = r as Record; + } + if (localCandidate && remoteCandidate) { + this.emitDiagnostic("selectedCandidatePair", { + local: { + candidateType: String(localCandidate.candidateType ?? "unknown"), + protocol: String(localCandidate.protocol ?? "unknown"), + address: localCandidate.address as string | undefined, + port: localCandidate.port as number | undefined, + }, + remote: { + candidateType: String(remoteCandidate.candidateType ?? "unknown"), + protocol: String(remoteCandidate.protocol ?? "unknown"), + address: remoteCandidate.address as string | undefined, + port: remoteCandidate.port as number | undefined, + }, + }); + } + break; + } + } + } catch { + // getStats can fail if PC is already closed; silently ignore + } + } + cleanup(): void { // Note: We intentionally do NOT stop the tracks here. // The tracks belong to the user's source stream, not the SDK. @@ -407,7 +557,7 @@ export class WebRTCConnection { applyCodecPreference(preferredCodecName: "video/VP8" | "video/H264") { if (!this.pc) return; if (typeof RTCRtpSender === "undefined" || typeof RTCRtpSender.getCapabilities !== "function") { - console.warn("RTCRtpSender capabilities are not available in this environment."); + this.logger.debug("RTCRtpSender capabilities not available in this environment"); return; } @@ -415,13 +565,13 @@ export class WebRTCConnection { .getTransceivers() .find((r) => r.sender.track?.kind === "video" || r.receiver.track?.kind === "video"); if (!videoTransceiver) { - console.error("Could not find video transceiver. Ensure track is added to peer connection."); + this.logger.warn("Video transceiver not found for codec preference"); return; } const capabilities = RTCRtpSender.getCapabilities("video"); if (!capabilities) { - console.error("Could not get video sender capabilities."); + this.logger.warn("Video sender capabilities unavailable"); return; } @@ -437,13 +587,13 @@ export class WebRTCConnection { const orderedCodecs = [...preferredCodecs, ...otherCodecs]; if (orderedCodecs.length === 0) { - console.warn("No video codecs found to set preferences for."); + this.logger.debug("No video codecs found for preference setting"); return; } try { videoTransceiver.setCodecPreferences(orderedCodecs); } catch { - console.warn("[WebRTC] setCodecPreferences not supported, skipping codec preference."); + this.logger.debug("setCodecPreferences not supported, skipping"); } } diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index f2ae2c7..d02104c 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -1,10 +1,14 @@ import pRetry, { AbortError } from "p-retry"; +import type { DiagnosticEmitter } from "./diagnostics"; +import type { Logger } from "../utils/logger"; import type { ConnectionState, OutgoingMessage } from "./types"; import { WebRTCConnection } from "./webrtc-connection"; export interface WebRTCConfig { webrtcUrl: string; integration?: string; + logger?: Logger; + onDiagnostic?: DiagnosticEmitter; onRemoteStream: (stream: MediaStream) => void; onConnectionStateChange?: (state: ConnectionState) => void; onError?: (error: Error) => void; @@ -37,6 +41,7 @@ const RETRY_OPTIONS = { export class WebRTCManager { private connection: WebRTCConnection; private config: WebRTCConfig; + private logger: Logger; private localStream: MediaStream | null = null; private subscribeMode = false; private managerState: ConnectionState = "disconnected"; @@ -47,6 +52,7 @@ export class WebRTCManager { constructor(config: WebRTCConfig) { this.config = config; + this.logger = config.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; this.connection = new WebRTCConnection({ onRemoteStream: config.onRemoteStream, onStateChange: (state) => this.handleConnectionStateChange(state), @@ -57,6 +63,8 @@ export class WebRTCManager { isAvatarLive: config.isAvatarLive, avatarImageBase64: config.avatarImageBase64, initialPrompt: config.initialPrompt, + logger: this.logger, + onDiagnostic: config.onDiagnostic, }); } @@ -100,10 +108,15 @@ export class WebRTCManager { const reconnectGeneration = ++this.reconnectGeneration; this.isReconnecting = true; this.emitState("reconnecting"); + const reconnectStart = performance.now(); try { + let attemptCount = 0; + await pRetry( async () => { + attemptCount++; + if (this.intentionalDisconnect || reconnectGeneration !== this.reconnectGeneration) { throw new AbortError("Reconnect cancelled"); } @@ -131,7 +144,14 @@ export class WebRTCManager { if (this.intentionalDisconnect || reconnectGeneration !== this.reconnectGeneration) { return; } - console.error(`[WebRTC] Reconnect attempt failed: ${error.message}`); + this.logger.warn("Reconnect attempt failed", { error: error.message, attempt: error.attemptNumber }); + this.config.onDiagnostic?.("reconnect", { + attempt: error.attemptNumber, + maxAttempts: RETRY_OPTIONS.retries, + durationMs: performance.now() - reconnectStart, + success: false, + error: error.message, + }); this.connection.cleanup(); }, shouldRetry: (error) => { @@ -143,6 +163,12 @@ export class WebRTCManager { }, }, ); + this.config.onDiagnostic?.("reconnect", { + attempt: attemptCount, + maxAttempts: RETRY_OPTIONS.retries, + durationMs: performance.now() - reconnectStart, + success: true, + }); // "connected" state is emitted by handleConnectionStateChange } catch (error) { this.isReconnecting = false; @@ -174,7 +200,7 @@ export class WebRTCManager { { ...RETRY_OPTIONS, onFailedAttempt: (error) => { - console.error(`[WebRTC] Failed to connect: ${error.message}`); + this.logger.warn("Connection attempt failed", { error: error.message, attempt: error.attemptNumber }); this.connection.cleanup(); }, shouldRetry: (error) => { @@ -209,6 +235,10 @@ export class WebRTCManager { return this.managerState; } + getPeerConnection(): RTCPeerConnection | null { + return this.connection.getPeerConnection(); + } + getWebsocketMessageEmitter() { return this.connection.websocketMessagesEmitter; } diff --git a/packages/sdk/src/realtime/webrtc-stats.ts b/packages/sdk/src/realtime/webrtc-stats.ts new file mode 100644 index 0000000..03cf670 --- /dev/null +++ b/packages/sdk/src/realtime/webrtc-stats.ts @@ -0,0 +1,233 @@ +export type WebRTCStats = { + timestamp: number; + video: { + framesDecoded: number; + framesDropped: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + freezeCount: number; + totalFreezesDuration: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + /** Delta: frames dropped since previous sample. */ + framesDroppedDelta: number; + /** Delta: freeze count since previous sample. */ + freezeCountDelta: number; + /** Delta: freeze duration (seconds) since previous sample. */ + freezeDurationDelta: number; + } | null; + audio: { + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + } | null; + /** Outbound video track stats (from the local camera/screen share being sent). */ + outboundVideo: { + /** Why the encoder is limiting quality: "none", "bandwidth", "cpu", or "other". */ + qualityLimitationReason: string; + /** Cumulative time (seconds) spent in each quality limitation state. */ + qualityLimitationDurations: Record; + bytesSent: number; + packetsSent: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + /** Estimated outbound bitrate in bits/sec, computed from bytesSent delta. */ + bitrate: number; + } | null; + connection: { + /** Current round-trip time in seconds, or null if unavailable. */ + currentRoundTripTime: number | null; + /** Available outgoing bitrate estimate in bits/sec, or null if unavailable. */ + availableOutgoingBitrate: number | null; + }; +}; + +export type StatsOptions = { + /** Polling interval in milliseconds. Default: 1000. Minimum: 500. */ + intervalMs?: number; +}; + +const DEFAULT_INTERVAL_MS = 1000; +const MIN_INTERVAL_MS = 500; + +export class WebRTCStatsCollector { + private pc: RTCPeerConnection | null = null; + private intervalId: ReturnType | null = null; + private prevBytesVideo = 0; + private prevBytesAudio = 0; + private prevBytesSentVideo = 0; + private prevTimestamp = 0; + // Previous cumulative values for delta computation + private prevPacketsLostVideo = 0; + private prevFramesDropped = 0; + private prevFreezeCount = 0; + private prevFreezeDuration = 0; + private prevPacketsLostAudio = 0; + private onStats: ((stats: WebRTCStats) => void) | null = null; + private intervalMs: number; + + constructor(options: StatsOptions = {}) { + this.intervalMs = Math.max(options.intervalMs ?? DEFAULT_INTERVAL_MS, MIN_INTERVAL_MS); + } + + /** Attach to a peer connection and start polling. */ + start(pc: RTCPeerConnection, onStats: (stats: WebRTCStats) => void): void { + this.stop(); + this.pc = pc; + this.onStats = onStats; + this.prevBytesVideo = 0; + this.prevBytesAudio = 0; + this.prevBytesSentVideo = 0; + this.prevTimestamp = 0; + this.prevPacketsLostVideo = 0; + this.prevFramesDropped = 0; + this.prevFreezeCount = 0; + this.prevFreezeDuration = 0; + this.prevPacketsLostAudio = 0; + this.intervalId = setInterval(() => this.collect(), this.intervalMs); + } + + /** Stop polling and release resources. */ + stop(): void { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this.pc = null; + this.onStats = null; + } + + isRunning(): boolean { + return this.intervalId !== null; + } + + private async collect(): Promise { + if (!this.pc || !this.onStats) return; + + try { + const rawStats = await this.pc.getStats(); + const stats = this.parse(rawStats); + this.onStats(stats); + } catch { + // PC might be closed; stop silently + this.stop(); + } + } + + private parse(rawStats: RTCStatsReport): WebRTCStats { + const now = performance.now(); + const elapsed = this.prevTimestamp > 0 ? (now - this.prevTimestamp) / 1000 : 0; + + let video: WebRTCStats["video"] = null; + let audio: WebRTCStats["audio"] = null; + let outboundVideo: WebRTCStats["outboundVideo"] = null; + const connection: WebRTCStats["connection"] = { + currentRoundTripTime: null, + availableOutgoingBitrate: null, + }; + + for (const report of rawStats.values()) { + if (report.type === "inbound-rtp" && report.kind === "video") { + const bytesReceived = (report as Record).bytesReceived as number ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; + this.prevBytesVideo = bytesReceived; + + const r = report as Record; + const packetsLost = (r.packetsLost as number) ?? 0; + const framesDropped = (r.framesDropped as number) ?? 0; + const freezeCount = (r.freezeCount as number) ?? 0; + const freezeDuration = (r.totalFreezesDuration as number) ?? 0; + + video = { + framesDecoded: (r.framesDecoded as number) ?? 0, + framesDropped, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth: (r.frameWidth as number) ?? 0, + frameHeight: (r.frameHeight as number) ?? 0, + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + freezeCount, + totalFreezesDuration: freezeDuration, + packetsLostDelta: Math.max(0, packetsLost - this.prevPacketsLostVideo), + framesDroppedDelta: Math.max(0, framesDropped - this.prevFramesDropped), + freezeCountDelta: Math.max(0, freezeCount - this.prevFreezeCount), + freezeDurationDelta: Math.max(0, freezeDuration - this.prevFreezeDuration), + }; + this.prevPacketsLostVideo = packetsLost; + this.prevFramesDropped = framesDropped; + this.prevFreezeCount = freezeCount; + this.prevFreezeDuration = freezeDuration; + } + + if (report.type === "outbound-rtp" && report.kind === "video") { + const r = report as Record; + const bytesSent = (r.bytesSent as number) ?? 0; + const outBitrate = elapsed > 0 ? ((bytesSent - this.prevBytesSentVideo) * 8) / elapsed : 0; + this.prevBytesSentVideo = bytesSent; + + outboundVideo = { + qualityLimitationReason: (r.qualityLimitationReason as string) ?? "none", + qualityLimitationDurations: (r.qualityLimitationDurations as Record) ?? {}, + bytesSent, + packetsSent: (r.packetsSent as number) ?? 0, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth: (r.frameWidth as number) ?? 0, + frameHeight: (r.frameHeight as number) ?? 0, + bitrate: Math.round(outBitrate), + }; + } + + if (report.type === "inbound-rtp" && report.kind === "audio") { + const bytesReceived = (report as Record).bytesReceived as number ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesAudio) * 8) / elapsed : 0; + this.prevBytesAudio = bytesReceived; + + const r = report as Record; + const audioPacketsLost = (r.packetsLost as number) ?? 0; + audio = { + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost: audioPacketsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + packetsLostDelta: Math.max(0, audioPacketsLost - this.prevPacketsLostAudio), + }; + this.prevPacketsLostAudio = audioPacketsLost; + } + + if (report.type === "candidate-pair") { + const r = report as Record; + if (r.state === "succeeded") { + connection.currentRoundTripTime = (r.currentRoundTripTime as number) ?? null; + connection.availableOutgoingBitrate = (r.availableOutgoingBitrate as number) ?? null; + } + } + } + + this.prevTimestamp = now; + + return { + timestamp: Date.now(), + video, + audio, + outboundVideo, + connection, + }; + } +} diff --git a/packages/sdk/src/utils/errors.ts b/packages/sdk/src/utils/errors.ts index 3171657..775722d 100644 --- a/packages/sdk/src/utils/errors.ts +++ b/packages/sdk/src/utils/errors.ts @@ -8,7 +8,6 @@ export type DecartSDKError = { export const ERROR_CODES = { INVALID_API_KEY: "INVALID_API_KEY", INVALID_BASE_URL: "INVALID_BASE_URL", - WEB_RTC_ERROR: "WEB_RTC_ERROR", PROCESSING_ERROR: "PROCESSING_ERROR", INVALID_INPUT: "INVALID_INPUT", INVALID_OPTIONS: "INVALID_OPTIONS", @@ -18,6 +17,12 @@ export const ERROR_CODES = { QUEUE_RESULT_ERROR: "QUEUE_RESULT_ERROR", JOB_NOT_COMPLETED: "JOB_NOT_COMPLETED", TOKEN_CREATE_ERROR: "TOKEN_CREATE_ERROR", + // WebRTC-specific error codes + WEBRTC_WEBSOCKET_ERROR: "WEBRTC_WEBSOCKET_ERROR", + WEBRTC_ICE_ERROR: "WEBRTC_ICE_ERROR", + WEBRTC_TIMEOUT_ERROR: "WEBRTC_TIMEOUT_ERROR", + WEBRTC_SERVER_ERROR: "WEBRTC_SERVER_ERROR", + WEBRTC_SIGNALING_ERROR: "WEBRTC_SIGNALING_ERROR", } as const; export function createSDKError( @@ -40,10 +45,45 @@ export function createInvalidBaseUrlError(url?: string): DecartSDKError { return createSDKError(ERROR_CODES.INVALID_BASE_URL, `Invalid base URL${url ? `: ${url}` : ""}`); } -export function createWebrtcError(error: Error): DecartSDKError { - return createSDKError(ERROR_CODES.WEB_RTC_ERROR, "WebRTC error", { - cause: error, - }); +export function createWebrtcWebsocketError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_WEBSOCKET_ERROR, "WebSocket connection failed", undefined, error); +} + +export function createWebrtcIceError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_ICE_ERROR, "ICE connection failed", undefined, error); +} + +export function createWebrtcTimeoutError(phase: string, timeoutMs: number, cause?: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_TIMEOUT_ERROR, `${phase} timed out after ${timeoutMs}ms`, { + phase, + timeoutMs, + }, cause); +} + +export function createWebrtcServerError(message: string): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_SERVER_ERROR, message); +} + +export function createWebrtcSignalingError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_SIGNALING_ERROR, "Signaling error", undefined, error); +} + +/** + * Classify a raw WebRTC error into a specific SDK error based on its message. + */ +export function classifyWebrtcError(error: Error): DecartSDKError { + const msg = error.message.toLowerCase(); + if (msg.includes("websocket")) { + return createWebrtcWebsocketError(error); + } + if (msg.includes("ice connection failed")) { + return createWebrtcIceError(error); + } + if (msg.includes("timeout") || msg.includes("timed out")) { + return createWebrtcTimeoutError("connection", 0, error); + } + // Default to signaling error for unclassified WebRTC errors + return createWebrtcSignalingError(error); } export function createInvalidInputError(message: string): DecartSDKError { diff --git a/packages/sdk/src/utils/logger.ts b/packages/sdk/src/utils/logger.ts new file mode 100644 index 0000000..e4c8c68 --- /dev/null +++ b/packages/sdk/src/utils/logger.ts @@ -0,0 +1,44 @@ +export type LogLevel = "debug" | "info" | "warn" | "error"; + +export type Logger = { + debug: (message: string, data?: Record) => void; + info: (message: string, data?: Record) => void; + warn: (message: string, data?: Record) => void; + error: (message: string, data?: Record) => void; +}; + +/** A logger that discards all messages. Zero overhead. */ +export const noopLogger: Logger = { + debug() {}, + info() {}, + warn() {}, + error() {}, +}; + +const LOG_LEVELS: Record = { debug: 0, info: 1, warn: 2, error: 3 }; + +/** + * Creates a console-based logger that only logs messages at or above the given level. + * + * @param minLevel - Minimum log level to output. Default: "warn". + */ +export function createConsoleLogger(minLevel: LogLevel = "warn"): Logger { + const threshold = LOG_LEVELS[minLevel]; + + const log = (level: LogLevel, message: string, data?: Record) => { + if (LOG_LEVELS[level] < threshold) return; + const prefix = "[DecartSDK]"; + if (data) { + console[level](prefix, message, data); + } else { + console[level](prefix, message); + } + }; + + return { + debug: (msg, data) => log("debug", msg, data), + info: (msg, data) => log("info", msg, data), + warn: (msg, data) => log("warn", msg, data), + error: (msg, data) => log("error", msg, data), + }; +} diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index bcf4623..758c770 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1562,6 +1562,596 @@ describe("Subscribe Client", () => { }); }); +describe("Logger", () => { + it("noopLogger does nothing", async () => { + const { noopLogger } = await import("../src/utils/logger.js"); + // Should not throw + noopLogger.debug("test"); + noopLogger.info("test"); + noopLogger.warn("test"); + noopLogger.error("test"); + }); + + it("createConsoleLogger filters by level", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const debugSpy = vi.spyOn(console, "debug").mockImplementation(() => {}); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("warn"); + logger.debug("d"); + logger.info("i"); + logger.warn("w"); + logger.error("e"); + + expect(debugSpy).not.toHaveBeenCalled(); + expect(infoSpy).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(1); + } finally { + debugSpy.mockRestore(); + infoSpy.mockRestore(); + warnSpy.mockRestore(); + errorSpy.mockRestore(); + } + }); + + it("createConsoleLogger at debug level logs everything", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const debugSpy = vi.spyOn(console, "debug").mockImplementation(() => {}); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("debug"); + logger.debug("d"); + logger.info("i"); + logger.warn("w"); + logger.error("e"); + + expect(debugSpy).toHaveBeenCalledTimes(1); + expect(infoSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(1); + } finally { + debugSpy.mockRestore(); + infoSpy.mockRestore(); + warnSpy.mockRestore(); + errorSpy.mockRestore(); + } + }); + + it("createConsoleLogger includes data in log output", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("warn"); + logger.warn("test message", { key: "value" }); + + expect(warnSpy).toHaveBeenCalledWith("[DecartSDK]", "test message", { key: "value" }); + } finally { + warnSpy.mockRestore(); + } + }); + + it("createConsoleLogger defaults to warn level", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger(); + logger.info("should not appear"); + logger.warn("should appear"); + + expect(infoSpy).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + } finally { + infoSpy.mockRestore(); + warnSpy.mockRestore(); + } + }); +}); + +describe("WebRTC Error Classification", () => { + it("classifies websocket errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("WebSocket connection closed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_WEBSOCKET_ERROR); + }); + + it("classifies ICE errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("ICE connection failed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_ICE_ERROR); + }); + + it("classifies timeout errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("Connection timed out")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_TIMEOUT_ERROR); + }); + + it("classifies unknown errors as signaling errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("SDP offer failed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SIGNALING_ERROR); + }); + + it("createWebrtcTimeoutError includes phase and timeout data", async () => { + const { createWebrtcTimeoutError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = createWebrtcTimeoutError("webrtc-handshake", 30000); + expect(result.code).toBe(ERROR_CODES.WEBRTC_TIMEOUT_ERROR); + expect(result.message).toBe("webrtc-handshake timed out after 30000ms"); + expect(result.data).toEqual({ phase: "webrtc-handshake", timeoutMs: 30000 }); + }); + + it("createWebrtcServerError preserves the message", async () => { + const { createWebrtcServerError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = createWebrtcServerError("Server overloaded"); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SERVER_ERROR); + expect(result.message).toBe("Server overloaded"); + }); + + it("factory functions preserve the cause error", async () => { + const { createWebrtcWebsocketError } = await import("../src/utils/errors.js"); + const cause = new Error("original"); + const result = createWebrtcWebsocketError(cause); + expect(result.cause).toBe(cause); + }); +}); + +describe("WebRTCStatsCollector", () => { + it("starts and stops polling", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const collector = new WebRTCStatsCollector(); + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + + collector.start(mockPC, onStats); + expect(collector.isRunning()).toBe(true); + + collector.stop(); + expect(collector.isRunning()).toBe(false); + }); + + it("parses inbound video stats", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const videoReport = { + type: "inbound-rtp", + kind: "video", + framesDecoded: 100, + framesDropped: 2, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + bytesReceived: 500000, + packetsReceived: 1000, + packetsLost: 5, + jitter: 0.01, + freezeCount: 0, + totalFreezesDuration: 0, + }; + + const statsReport = new Map([["video-1", videoReport]]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.video).not.toBeNull(); + expect(stats.video?.framesDecoded).toBe(100); + expect(stats.video?.framesDropped).toBe(2); + expect(stats.video?.framesPerSecond).toBe(30); + expect(stats.video?.frameWidth).toBe(1280); + expect(stats.video?.frameHeight).toBe(720); + expect(stats.video?.packetsLost).toBe(5); + expect(stats.audio).toBeNull(); + expect(stats.connection.currentRoundTripTime).toBeNull(); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("parses inbound audio and candidate-pair stats", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const audioReport = { + type: "inbound-rtp", + kind: "audio", + bytesReceived: 10000, + packetsReceived: 200, + packetsLost: 1, + jitter: 0.005, + }; + + const candidatePairReport = { + type: "candidate-pair", + state: "succeeded", + currentRoundTripTime: 0.05, + availableOutgoingBitrate: 2000000, + }; + + const statsReport = new Map([ + ["audio-1", audioReport], + ["cp-1", candidatePairReport], + ]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.audio).not.toBeNull(); + expect(stats.audio?.packetsLost).toBe(1); + expect(stats.connection.currentRoundTripTime).toBe(0.05); + expect(stats.connection.availableOutgoingBitrate).toBe(2000000); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes video bitrate from bytesReceived delta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let bytesReceived = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + bytesReceived += 125000; // 125KB per second = ~1Mbps + return new Map([ + [ + "video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived, + framesDecoded: 0, + framesDropped: 0, + framesPerSecond: 0, + frameWidth: 0, + frameHeight: 0, + packetsReceived: 0, + packetsLost: 0, + jitter: 0, + freezeCount: 0, + totalFreezesDuration: 0, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: no previous data, bitrate = 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.bitrate).toBe(0); + + // Second tick: has delta, should compute bitrate + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.bitrate).toBeGreaterThan(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("enforces minimum interval of 500ms", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 100 }); // Below minimum + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + collector.start(mockPC, onStats); + + // At 100ms, nothing should fire (minimum is 500ms) + await vi.advanceTimersByTimeAsync(100); + expect(onStats).not.toHaveBeenCalled(); + + // At 500ms, it should fire + await vi.advanceTimersByTimeAsync(400); + expect(onStats).toHaveBeenCalledTimes(1); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("stops silently if getStats throws", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const mockPC = { + getStats: vi.fn().mockRejectedValue(new Error("PC closed")), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + collector.start(mockPC, onStats); + + await vi.advanceTimersByTimeAsync(1000); + + expect(onStats).not.toHaveBeenCalled(); + expect(collector.isRunning()).toBe(false); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("TelemetryReporter", () => { + it("buffers stats and diagnostics then flushes on interval", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", + sessionId: "sess-1", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + reporter.addDiagnostic({ name: "phaseTiming", data: { phase: "total", durationMs: 500, success: true }, timestamp: 1000 }); + + // Before interval: no fetch + expect(fetchMock).not.toHaveBeenCalled(); + + // After interval: flush + await vi.advanceTimersByTimeAsync(5000); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, options] = fetchMock.mock.calls[0]; + expect(url).toBe("https://api.decart.ai/v1/telemetry"); + expect(options.method).toBe("POST"); + expect(options.keepalive).toBe(false); + + const body = JSON.parse(options.body); + expect(body.sessionId).toBe("sess-1"); + expect(body.stats).toHaveLength(1); + expect(body.diagnostics).toHaveLength(1); + expect(body.diagnostics[0].name).toBe("phaseTiming"); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("does not send empty reports", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", + sessionId: "sess-1", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + // No data added — interval fires + await vi.advanceTimersByTimeAsync(5000); + + expect(fetchMock).not.toHaveBeenCalled(); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("stop sends final report with keepalive", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", + sessionId: "sess-2", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 2000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [, options] = fetchMock.mock.calls[0]; + expect(options.keepalive).toBe(true); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("silently handles fetch failures", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockRejectedValue(new Error("network error")); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", + sessionId: "sess-3", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 3000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + // Should not throw + reporter.flush(); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("includes auth headers and sdk version in report", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "my-api-key", + sessionId: "sess-4", + integration: "test-integration", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 4000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const [, options] = fetchMock.mock.calls[0]; + expect(options.headers["X-API-KEY"]).toBe("my-api-key"); + expect(options.headers["Content-Type"]).toBe("application/json"); + + const body = JSON.parse(options.body); + expect(body.sdkVersion).toBeDefined(); + expect(typeof body.sdkVersion).toBe("string"); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("clears buffers after sending", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", + sessionId: "sess-5", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + // First flush + await vi.advanceTimersByTimeAsync(5000); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Second interval: no new data, should not send + await vi.advanceTimersByTimeAsync(5000); + expect(fetchMock).toHaveBeenCalledTimes(1); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); +}); + describe("WebSockets Connection", () => { it("connect resolves when state becomes generating before poll observes connected", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); @@ -1768,3 +2358,339 @@ describe("WebSockets Connection", () => { } }); }); + +describe("NullTelemetryReporter", () => { + it("all methods are no-ops", async () => { + const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const reporter = new NullTelemetryReporter(); + + // None of these should throw + reporter.start(); + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + outboundVideo: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + reporter.addDiagnostic({ name: "phaseTiming", data: {}, timestamp: 1000 }); + reporter.flush(); + reporter.stop(); + }); + + it("implements ITelemetryReporter interface", async () => { + const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const reporter = new NullTelemetryReporter(); + + expect(typeof reporter.start).toBe("function"); + expect(typeof reporter.addStats).toBe("function"); + expect(typeof reporter.addDiagnostic).toBe("function"); + expect(typeof reporter.flush).toBe("function"); + expect(typeof reporter.stop).toBe("function"); + }); +}); + +describe("Outbound Video Stats", () => { + it("parses outbound-rtp video with quality limitation tracking", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const outboundVideoReport = { + type: "outbound-rtp", + kind: "video", + bytesSent: 200000, + packetsSent: 500, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + qualityLimitationReason: "bandwidth", + qualityLimitationDurations: { none: 5.0, bandwidth: 2.5, cpu: 0, other: 0 }, + }; + + const statsReport = new Map([["outbound-video-1", outboundVideoReport]]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.outboundVideo).not.toBeNull(); + expect(stats.outboundVideo?.qualityLimitationReason).toBe("bandwidth"); + expect(stats.outboundVideo?.qualityLimitationDurations).toEqual({ + none: 5.0, + bandwidth: 2.5, + cpu: 0, + other: 0, + }); + expect(stats.outboundVideo?.framesPerSecond).toBe(30); + expect(stats.outboundVideo?.frameWidth).toBe(1280); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("returns null outboundVideo when no outbound-rtp report", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats[0].outboundVideo).toBeNull(); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes outbound video bitrate from bytesSent delta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let bytesSent = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + bytesSent += 62500; // 62.5KB per second = ~500kbps + return new Map([ + [ + "outbound-video-1", + { + type: "outbound-rtp", + kind: "video", + bytesSent, + packetsSent: 0, + framesPerSecond: 30, + frameWidth: 640, + frameHeight: 480, + qualityLimitationReason: "none", + qualityLimitationDurations: {}, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: no previous data, bitrate = 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].outboundVideo?.bitrate).toBe(0); + + // Second tick: has delta + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].outboundVideo?.bitrate).toBeGreaterThan(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("Delta computation for cumulative counters", () => { + it("computes packetsLostDelta, framesDroppedDelta, freezeCountDelta, freezeDurationDelta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let packetsLost = 0; + let framesDropped = 0; + let freezeCount = 0; + let totalFreezesDuration = 0; + + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + packetsLost += 3; + framesDropped += 2; + freezeCount += 1; + totalFreezesDuration += 0.5; + return new Map([ + [ + "inbound-video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived: 100000, + packetsReceived: 500, + packetsLost, + framesDecoded: 100, + framesDropped, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + jitter: 0.01, + freezeCount, + totalFreezesDuration, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: delta = cumulative (since prev was 0) + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.packetsLostDelta).toBe(3); + expect(receivedStats[0].video?.framesDroppedDelta).toBe(2); + expect(receivedStats[0].video?.freezeCountDelta).toBe(1); + expect(receivedStats[0].video?.freezeDurationDelta).toBe(0.5); + + // Second tick: delta = increment from previous + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.packetsLostDelta).toBe(3); + expect(receivedStats[1].video?.framesDroppedDelta).toBe(2); + expect(receivedStats[1].video?.freezeCountDelta).toBe(1); + expect(receivedStats[1].video?.freezeDurationDelta).toBe(0.5); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes audio packetsLostDelta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let audioPacketsLost = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + audioPacketsLost += 5; + return new Map([ + [ + "inbound-audio-1", + { + type: "inbound-rtp", + kind: "audio", + bytesReceived: 50000, + packetsReceived: 200, + packetsLost: audioPacketsLost, + jitter: 0.02, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].audio?.packetsLostDelta).toBe(5); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].audio?.packetsLostDelta).toBe(5); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("clamps deltas to zero if cumulative counter resets", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let packetsLost = 10; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + const current = packetsLost; + // Simulate counter reset on second call + if (mockPC.getStats.mock.calls.length === 2) { + packetsLost = 0; + } + return new Map([ + [ + "inbound-video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived: 100000, + packetsReceived: 500, + packetsLost: current, + framesDecoded: 100, + framesDropped: 0, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + jitter: 0.01, + freezeCount: 0, + totalFreezesDuration: 0, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.packetsLostDelta).toBe(10); + + // Counter reset: 0 - 10 = -10, clamped to 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.packetsLostDelta).toBe(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("VideoStall Diagnostic", () => { + it("videoStall event type exists in DiagnosticEvents", async () => { + // Type-level check: videoStall is a valid DiagnosticEventName + const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + name: "videoStall", + data: { stalled: true, durationMs: 0 }, + }; + expect(event.name).toBe("videoStall"); + expect(event.data.stalled).toBe(true); + expect(event.data.durationMs).toBe(0); + }); + + it("videoStall recovery includes duration", () => { + const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + name: "videoStall", + data: { stalled: false, durationMs: 1500 }, + }; + expect(event.data.stalled).toBe(false); + expect(event.data.durationMs).toBe(1500); + }); +}); From 88b266a8a9a2567143198be2a8fc38ca7065d384 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:16:10 +0200 Subject: [PATCH 02/10] refactor(telemetry): hard-code telemetry URL, remove configurability The telemetry endpoint is always https://api.decart.ai/v1/telemetry. Remove telemetryUrl from RealTimeClientOptions and TelemetryReporterOptions. Also cleans up unused httpBaseUrl variable. Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/index.ts | 3 --- packages/sdk/src/realtime/client.ts | 2 -- packages/sdk/src/realtime/telemetry-reporter.ts | 6 ++---- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 23369ed..c9e66a5 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -182,11 +182,8 @@ export const createDecartClient = (options: DecartClientOptions = {}) => { // Proxy mode is only for HTTP endpoints (process, queue, tokens) // Note: Realtime will fail at connection time if no API key is provided const wsBaseUrl = "wss://api3.decart.ai"; - const httpBaseUrl = parsedOptions.data.baseUrl || "https://api.decart.ai"; const realtime = createRealTimeClient({ baseUrl: wsBaseUrl, - telemetryUrl: "http://localhost:3003/api", - // telemetryUrl: httpBaseUrl, apiKey: apiKey || "", integration, logger, diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index fcf94f0..0ad11b9 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -71,7 +71,6 @@ async function imageToBase64(image: Blob | File | string): Promise { export type RealTimeClientOptions = { baseUrl: string; - telemetryUrl: string; apiKey: string; integration?: string; logger: Logger; @@ -223,7 +222,6 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { // Start telemetry reporter now that we have a session ID if (opts.telemetryEnabled) { const reporter = new TelemetryReporter({ - telemetryUrl: opts.telemetryUrl, apiKey, sessionId: msg.session_id, integration, diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/telemetry-reporter.ts index 4545ff0..1e1fbc5 100644 --- a/packages/sdk/src/realtime/telemetry-reporter.ts +++ b/packages/sdk/src/realtime/telemetry-reporter.ts @@ -4,6 +4,7 @@ import { VERSION } from "../version"; import type { WebRTCStats } from "./webrtc-stats"; const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds +const TELEMETRY_URL = "https://api.decart.ai/v1/telemetry"; type TelemetryDiagnostic = { name: string; @@ -22,7 +23,6 @@ type TelemetryReport = { }; export interface TelemetryReporterOptions { - telemetryUrl: string; apiKey: string; sessionId: string; integration?: string; @@ -49,7 +49,6 @@ export class NullTelemetryReporter implements ITelemetryReporter { } export class TelemetryReporter implements ITelemetryReporter { - private telemetryUrl: string; private apiKey: string; private sessionId: string; private integration?: string; @@ -60,7 +59,6 @@ export class TelemetryReporter implements ITelemetryReporter { private diagnosticsBuffer: TelemetryDiagnostic[] = []; constructor(options: TelemetryReporterOptions) { - this.telemetryUrl = options.telemetryUrl; this.apiKey = options.apiKey; this.sessionId = options.sessionId; this.integration = options.integration; @@ -119,7 +117,7 @@ export class TelemetryReporter implements ITelemetryReporter { try { const headers = buildAuthHeaders({ apiKey: this.apiKey, integration: this.integration }); - fetch(`${this.telemetryUrl}/v1/telemetry`, { + fetch(TELEMETRY_URL, { method: "POST", headers: { ...headers, From 701ea82fdef7aaa2f7fd10934850b337e49f1cdd Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:16:30 +0200 Subject: [PATCH 03/10] fix(telemetry): use correct production URL Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/realtime/telemetry-reporter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/telemetry-reporter.ts index 1e1fbc5..8e22a5c 100644 --- a/packages/sdk/src/realtime/telemetry-reporter.ts +++ b/packages/sdk/src/realtime/telemetry-reporter.ts @@ -4,7 +4,7 @@ import { VERSION } from "../version"; import type { WebRTCStats } from "./webrtc-stats"; const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds -const TELEMETRY_URL = "https://api.decart.ai/v1/telemetry"; +const TELEMETRY_URL = "https://platform.decart.ai/api/v1/telemetry"; type TelemetryDiagnostic = { name: string; From 86beeea7efa964d5600eac853a658947cfd3ada6 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:17:39 +0200 Subject: [PATCH 04/10] chore: remove root package-lock.json Co-Authored-By: Claude Opus 4.6 --- package-lock.json | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 package-lock.json diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index 4a6eaa6..0000000 --- a/package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "sun-valley", - "lockfileVersion": 3, - "requires": true, - "packages": {} -} From 9c74bd6fd64b23ea494e381b33fc884ccf67437d Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:29:00 +0200 Subject: [PATCH 05/10] fix formatting with biome Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/realtime/client.ts | 12 ++++++++++-- packages/sdk/src/realtime/webrtc-connection.ts | 7 +++++-- packages/sdk/src/realtime/webrtc-stats.ts | 4 ++-- packages/sdk/src/utils/errors.ts | 13 +++++++++---- packages/sdk/tests/unit.test.ts | 6 +++++- 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 8b91eb3..51a450f 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -250,12 +250,20 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { videoStalled = true; stallStartMs = Date.now(); emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); - telemetryReporter.addDiagnostic({ name: "videoStall", data: { stalled: true, durationMs: 0 }, timestamp: stallStartMs }); + telemetryReporter.addDiagnostic({ + name: "videoStall", + data: { stalled: true, durationMs: 0 }, + timestamp: stallStartMs, + }); } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { const durationMs = Date.now() - stallStartMs; videoStalled = false; emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); - telemetryReporter.addDiagnostic({ name: "videoStall", data: { stalled: false, durationMs }, timestamp: Date.now() }); + telemetryReporter.addDiagnostic({ + name: "videoStall", + data: { stalled: false, durationMs }, + timestamp: Date.now(), + }); } }); } diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index f7da7a5..e49ade1 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -276,8 +276,11 @@ export class WebRTCConnection { await this.pc.addIceCandidate(msg.candidate); this.emitDiagnostic("iceCandidate", { source: "remote", - candidateType: (msg.candidate.candidate?.match(/typ (\w+)/)?.[1] as IceCandidateEvent["candidateType"]) ?? "unknown", - protocol: (msg.candidate.candidate?.match(/udp|tcp/i)?.[0]?.toLowerCase() as IceCandidateEvent["protocol"]) ?? "unknown", + candidateType: + (msg.candidate.candidate?.match(/typ (\w+)/)?.[1] as IceCandidateEvent["candidateType"]) ?? "unknown", + protocol: + (msg.candidate.candidate?.match(/udp|tcp/i)?.[0]?.toLowerCase() as IceCandidateEvent["protocol"]) ?? + "unknown", }); } break; diff --git a/packages/sdk/src/realtime/webrtc-stats.ts b/packages/sdk/src/realtime/webrtc-stats.ts index 03cf670..af62291 100644 --- a/packages/sdk/src/realtime/webrtc-stats.ts +++ b/packages/sdk/src/realtime/webrtc-stats.ts @@ -141,7 +141,7 @@ export class WebRTCStatsCollector { for (const report of rawStats.values()) { if (report.type === "inbound-rtp" && report.kind === "video") { - const bytesReceived = (report as Record).bytesReceived as number ?? 0; + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; this.prevBytesVideo = bytesReceived; @@ -194,7 +194,7 @@ export class WebRTCStatsCollector { } if (report.type === "inbound-rtp" && report.kind === "audio") { - const bytesReceived = (report as Record).bytesReceived as number ?? 0; + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesAudio) * 8) / elapsed : 0; this.prevBytesAudio = bytesReceived; diff --git a/packages/sdk/src/utils/errors.ts b/packages/sdk/src/utils/errors.ts index 775722d..678f451 100644 --- a/packages/sdk/src/utils/errors.ts +++ b/packages/sdk/src/utils/errors.ts @@ -54,10 +54,15 @@ export function createWebrtcIceError(error: Error): DecartSDKError { } export function createWebrtcTimeoutError(phase: string, timeoutMs: number, cause?: Error): DecartSDKError { - return createSDKError(ERROR_CODES.WEBRTC_TIMEOUT_ERROR, `${phase} timed out after ${timeoutMs}ms`, { - phase, - timeoutMs, - }, cause); + return createSDKError( + ERROR_CODES.WEBRTC_TIMEOUT_ERROR, + `${phase} timed out after ${timeoutMs}ms`, + { + phase, + timeoutMs, + }, + cause, + ); } export function createWebrtcServerError(message: string): DecartSDKError { diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index ae03fca..deeb537 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1954,7 +1954,11 @@ describe("TelemetryReporter", () => { audio: null, connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, }); - reporter.addDiagnostic({ name: "phaseTiming", data: { phase: "total", durationMs: 500, success: true }, timestamp: 1000 }); + reporter.addDiagnostic({ + name: "phaseTiming", + data: { phase: "total", durationMs: 500, success: true }, + timestamp: 1000, + }); // Before interval: no fetch expect(fetchMock).not.toHaveBeenCalled(); From 52e83f92013bc86c0d1ed07166304c21534dade1 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:30:58 +0200 Subject: [PATCH 06/10] fix import sorting order (biome organizeImports) Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/realtime/client.ts | 6 +++--- packages/sdk/src/realtime/webrtc-connection.ts | 2 +- packages/sdk/src/realtime/webrtc-manager.ts | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 51a450f..0f1dae3 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -3,10 +3,8 @@ import { modelDefinitionSchema, type RealTimeModels } from "../shared/model"; import { modelStateSchema } from "../shared/types"; import { classifyWebrtcError, type DecartSDKError } from "../utils/errors"; import type { Logger } from "../utils/logger"; -import type { DiagnosticEvent } from "./diagnostics"; -import { WebRTCStatsCollector, type StatsOptions, type WebRTCStats } from "./webrtc-stats"; -import { TelemetryReporter, NullTelemetryReporter, type ITelemetryReporter } from "./telemetry-reporter"; import { AudioStreamManager } from "./audio-stream-manager"; +import type { DiagnosticEvent } from "./diagnostics"; import { createEventBuffer } from "./event-buffer"; import { realtimeMethods, type SetInput } from "./methods"; import { @@ -16,8 +14,10 @@ import { type SubscribeEvents, type SubscribeOptions, } from "./subscribe-client"; +import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types"; import { WebRTCManager } from "./webrtc-manager"; +import { type StatsOptions, type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; async function blobToBase64(blob: Blob): Promise { return new Promise((resolve, reject) => { diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index e49ade1..b2907d4 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -1,8 +1,8 @@ import mitt from "mitt"; -import type { DiagnosticEmitter, IceCandidateEvent } from "./diagnostics"; import type { RealTimeModels } from "../shared/model"; import type { Logger } from "../utils/logger"; import { buildUserAgent } from "../utils/user-agent"; +import type { DiagnosticEmitter, IceCandidateEvent } from "./diagnostics"; import type { ConnectionState, GenerationTickMessage, diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index 796118c..e11c980 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -1,7 +1,7 @@ import pRetry, { AbortError } from "p-retry"; -import type { DiagnosticEmitter } from "./diagnostics"; import type { RealTimeModels } from "../shared/model"; import type { Logger } from "../utils/logger"; +import type { DiagnosticEmitter } from "./diagnostics"; import type { ConnectionState, OutgoingMessage } from "./types"; import { WebRTCConnection } from "./webrtc-connection"; From 261862bab265ef28124b587f876b960031f196a8 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:33:20 +0200 Subject: [PATCH 07/10] fix typecheck: use forEach instead of values() on RTCStatsReport RTCStatsReport.values() is not in TypeScript's DOM lib types. Use forEach() which is properly typed. Co-Authored-By: Claude Opus 4.6 --- packages/sdk/src/realtime/webrtc-connection.ts | 12 +++++++----- packages/sdk/src/realtime/webrtc-stats.ts | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index b2907d4..118b167 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -519,14 +519,17 @@ export class WebRTCConnection { if (!this.pc) return; try { const stats = await this.pc.getStats(); - for (const report of stats.values()) { + let found = false; + stats.forEach((report) => { + if (found) return; if (report.type === "candidate-pair" && report.state === "succeeded") { + found = true; let localCandidate: Record | undefined; let remoteCandidate: Record | undefined; - for (const r of stats.values()) { + stats.forEach((r) => { if (r.id === report.localCandidateId) localCandidate = r as Record; if (r.id === report.remoteCandidateId) remoteCandidate = r as Record; - } + }); if (localCandidate && remoteCandidate) { this.emitDiagnostic("selectedCandidatePair", { local: { @@ -543,9 +546,8 @@ export class WebRTCConnection { }, }); } - break; } - } + }); } catch { // getStats can fail if PC is already closed; silently ignore } diff --git a/packages/sdk/src/realtime/webrtc-stats.ts b/packages/sdk/src/realtime/webrtc-stats.ts index af62291..42319a4 100644 --- a/packages/sdk/src/realtime/webrtc-stats.ts +++ b/packages/sdk/src/realtime/webrtc-stats.ts @@ -139,7 +139,7 @@ export class WebRTCStatsCollector { availableOutgoingBitrate: null, }; - for (const report of rawStats.values()) { + rawStats.forEach((report) => { if (report.type === "inbound-rtp" && report.kind === "video") { const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; @@ -218,7 +218,7 @@ export class WebRTCStatsCollector { connection.availableOutgoingBitrate = (r.availableOutgoingBitrate as number) ?? null; } } - } + }); this.prevTimestamp = now; From 34c262ab5ae5e82c260581a5026028359175ed96 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 14:45:56 +0200 Subject: [PATCH 08/10] feat(realtime): add batch chunking, response status handling, and model propagation to telemetry reporter --- packages/sdk/src/realtime/client.ts | 1 + .../sdk/src/realtime/telemetry-reporter.ts | 87 +++++++--- packages/sdk/tests/unit.test.ts | 148 +++++++++++++++++- 3 files changed, 208 insertions(+), 28 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 0f1dae3..485062a 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -208,6 +208,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const reporter = new TelemetryReporter({ apiKey, sessionId: msg.session_id, + model: options.model.name, integration, logger, }); diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/telemetry-reporter.ts index 8e22a5c..a2a24bd 100644 --- a/packages/sdk/src/realtime/telemetry-reporter.ts +++ b/packages/sdk/src/realtime/telemetry-reporter.ts @@ -6,6 +6,12 @@ import type { WebRTCStats } from "./webrtc-stats"; const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds const TELEMETRY_URL = "https://platform.decart.ai/api/v1/telemetry"; +/** + * Maximum number of items per array (stats / diagnostics) in a single report. + * Matches the backend Zod schema which enforces `z.array().max(120)`. + */ +const MAX_ITEMS_PER_REPORT = 120; + type TelemetryDiagnostic = { name: string; data: unknown; @@ -16,6 +22,7 @@ type TelemetryReport = { sessionId: string; timestamp: number; sdkVersion: string; + model?: string; /** Tags that the backend should attach to every Datadog metric/log from this report. */ tags: Record; stats: WebRTCStats[]; @@ -25,6 +32,7 @@ type TelemetryReport = { export interface TelemetryReporterOptions { apiKey: string; sessionId: string; + model?: string; integration?: string; logger: Logger; reportIntervalMs?: number; @@ -51,6 +59,7 @@ export class NullTelemetryReporter implements ITelemetryReporter { export class TelemetryReporter implements ITelemetryReporter { private apiKey: string; private sessionId: string; + private model?: string; private integration?: string; private logger: Logger; private reportIntervalMs: number; @@ -61,6 +70,7 @@ export class TelemetryReporter implements ITelemetryReporter { constructor(options: TelemetryReporterOptions) { this.apiKey = options.apiKey; this.sessionId = options.sessionId; + this.model = options.model; this.integration = options.integration; this.logger = options.logger; this.reportIntervalMs = options.reportIntervalMs ?? DEFAULT_REPORT_INTERVAL_MS; @@ -96,38 +106,71 @@ export class TelemetryReporter implements ITelemetryReporter { this.sendReport(true); } - private sendReport(keepalive: boolean): void { + /** + * Build a single chunk from the front of the buffers, respecting MAX_ITEMS_PER_REPORT. + * Returns null when both buffers are empty. + */ + private createReportChunk(): TelemetryReport | null { if (this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0) { - return; + return null; } - const report: TelemetryReport = { + const tags: Record = { + session_id: this.sessionId, + sdk_version: VERSION, + ...(this.model ? { model: this.model } : {}), + ...(this.integration ? { integration: this.integration } : {}), + }; + + return { sessionId: this.sessionId, timestamp: Date.now(), sdkVersion: VERSION, - tags: { - session_id: this.sessionId, - sdk_version: VERSION, - ...(this.integration ? { integration: this.integration } : {}), - }, - stats: this.statsBuffer.splice(0), - diagnostics: this.diagnosticsBuffer.splice(0), + ...(this.model ? { model: this.model } : {}), + tags, + stats: this.statsBuffer.splice(0, MAX_ITEMS_PER_REPORT), + diagnostics: this.diagnosticsBuffer.splice(0, MAX_ITEMS_PER_REPORT), }; + } + + private sendReport(keepalive: boolean): void { + if (this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0) { + return; + } try { const headers = buildAuthHeaders({ apiKey: this.apiKey, integration: this.integration }); - - fetch(TELEMETRY_URL, { - method: "POST", - headers: { - ...headers, - "Content-Type": "application/json", - }, - body: JSON.stringify(report), - keepalive, - }).catch((error) => { - this.logger.debug("Telemetry report failed", { error: String(error) }); - }); + const commonHeaders = { + ...headers, + "Content-Type": "application/json", + }; + + // Send as many chunks as needed to drain both buffers. + let chunk = this.createReportChunk(); + while (chunk !== null) { + const isLast = this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0; + + fetch(TELEMETRY_URL, { + method: "POST", + headers: commonHeaders, + body: JSON.stringify(chunk), + // Only set keepalive on the very last chunk (if the caller requested it). + keepalive: keepalive && isLast, + }) + .then((response) => { + if (!response.ok) { + this.logger.warn("Telemetry report rejected", { + status: response.status, + statusText: response.statusText, + }); + } + }) + .catch((error) => { + this.logger.debug("Telemetry report failed", { error: String(error) }); + }); + + chunk = this.createReportChunk(); + } } catch (error) { this.logger.debug("Telemetry report failed", { error: String(error) }); } diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index deeb537..17817f9 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1939,7 +1939,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", sessionId: "sess-1", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -1995,7 +1995,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", sessionId: "sess-1", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2024,7 +2024,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", sessionId: "sess-2", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2057,7 +2057,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", sessionId: "sess-3", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2086,7 +2086,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "my-api-key", sessionId: "sess-4", integration: "test-integration", @@ -2124,7 +2124,7 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - telemetryUrl: "https://api.decart.ai", + apiKey: "test-key", sessionId: "sess-5", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2154,6 +2154,142 @@ describe("TelemetryReporter", () => { vi.unstubAllGlobals(); } }); + + it("chunks reports when buffers exceed 120 items", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-chunk", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + // Add 150 stats (exceeds max of 120) + for (let i = 0; i < 150; i++) { + reporter.addStats({ + timestamp: i, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + } + + reporter.flush(); + + // Should produce 2 requests: 120 + 30 + expect(fetchMock).toHaveBeenCalledTimes(2); + + const body1 = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body1.stats).toHaveLength(120); + + const body2 = JSON.parse(fetchMock.mock.calls[1][1].body); + expect(body2.stats).toHaveLength(30); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("warns on non-2xx response status", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: false, status: 500, statusText: "Internal Server Error" }); + vi.stubGlobal("fetch", fetchMock); + + try { + const warnMock = vi.fn(); + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-warn", + logger: { debug() {}, info() {}, warn: warnMock, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + // Wait for the .then() handler to execute + await vi.waitFor(() => { + expect(warnMock).toHaveBeenCalledTimes(1); + }); + + expect(warnMock).toHaveBeenCalledWith("Telemetry report rejected", { + status: 500, + statusText: "Internal Server Error", + }); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("includes model in report body and tags when provided", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-model", + model: "gemini-3n", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body.model).toBe("gemini-3n"); + expect(body.tags.model).toBe("gemini-3n"); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("omits model from report when not provided", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-no-model", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body.model).toBeUndefined(); + expect(body.tags.model).toBeUndefined(); + } finally { + vi.unstubAllGlobals(); + } + }); }); describe("WebSockets Connection", () => { From 5fba9105526e96b58f568d0b881e826d0c8f60ec Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 16:10:45 +0200 Subject: [PATCH 09/10] fix(realtime): handle telemetry and stats correctly across reconnects --- packages/sdk/src/realtime/client.ts | 72 ++++- .../sdk/src/realtime/webrtc-connection.ts | 3 +- packages/sdk/src/realtime/webrtc-manager.ts | 4 +- packages/sdk/src/utils/errors.ts | 20 +- packages/sdk/tests/unit.test.ts | 247 +++++++++++++++++- 5 files changed, 318 insertions(+), 28 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 485062a..91a31c8 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -152,6 +152,8 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { } let webrtcManager: WebRTCManager | undefined; + let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); + let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null; try { // Prepare initial image base64 before connection @@ -175,11 +177,12 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { logger, onDiagnostic: (name, data) => { emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]); - telemetryReporter.addDiagnostic({ name, data, timestamp: Date.now() }); + addTelemetryDiagnostic(name, data); }, onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); + handleConnectionStateChange?.(state); }, onError: (error) => { logger.error("WebRTC error", { error: error.message }); @@ -197,7 +200,29 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let sessionId: string | null = null; let subscribeToken: string | null = null; - let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); + const pendingTelemetryDiagnostics: Array<{ + name: DiagnosticEvent["name"]; + data: DiagnosticEvent["data"]; + timestamp: number; + }> = []; + let telemetryReporterReady = false; + + const addTelemetryDiagnostic = ( + name: DiagnosticEvent["name"], + data: DiagnosticEvent["data"], + timestamp: number = Date.now(), + ): void => { + if (!opts.telemetryEnabled) { + return; + } + + if (!telemetryReporterReady) { + pendingTelemetryDiagnostics.push({ name, data, timestamp }); + return; + } + + telemetryReporter.addDiagnostic({ name, data, timestamp }); + }; const sessionIdListener = (msg: SessionIdMessage) => { subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port); @@ -205,6 +230,10 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { // Start telemetry reporter now that we have a session ID if (opts.telemetryEnabled) { + if (telemetryReporterReady) { + telemetryReporter.stop(); + } + const reporter = new TelemetryReporter({ apiKey, sessionId: msg.session_id, @@ -214,6 +243,12 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { }); reporter.start(); telemetryReporter = reporter; + telemetryReporterReady = true; + + for (const diagnostic of pendingTelemetryDiagnostics) { + telemetryReporter.addDiagnostic(diagnostic); + } + pendingTelemetryDiagnostics.length = 0; } }; manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener); @@ -228,6 +263,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const methods = realtimeMethods(manager, imageToBase64); let statsCollector: WebRTCStatsCollector | null = null; + let statsCollectorPeerConnection: RTCPeerConnection | null = null; // Video stall detection state (Twilio pattern: fps < 0.5 = stalled) const STALL_FPS_THRESHOLD = 0.5; @@ -240,6 +276,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { stallStartMs = 0; statsCollector = new WebRTCStatsCollector(statsOptions); const pc = manager.getPeerConnection(); + statsCollectorPeerConnection = pc; if (pc) { statsCollector.start(pc, (stats) => { emitOrBuffer("stats", stats); @@ -251,29 +288,39 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { videoStalled = true; stallStartMs = Date.now(); emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); - telemetryReporter.addDiagnostic({ - name: "videoStall", - data: { stalled: true, durationMs: 0 }, - timestamp: stallStartMs, - }); + addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs); } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { const durationMs = Date.now() - stallStartMs; videoStalled = false; emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); - telemetryReporter.addDiagnostic({ - name: "videoStall", - data: { stalled: false, durationMs }, - timestamp: Date.now(), - }); + addTelemetryDiagnostic("videoStall", { stalled: false, durationMs }); } }); } return () => { statsCollector?.stop(); statsCollector = null; + statsCollectorPeerConnection = null; }; }; + handleConnectionStateChange = (state) => { + if (!opts.telemetryEnabled) { + return; + } + + if (state !== "connected" && state !== "generating") { + return; + } + + const peerConnection = manager.getPeerConnection(); + if (!peerConnection || peerConnection === statsCollectorPeerConnection) { + return; + } + + startStatsCollection(); + }; + // Auto-start stats when telemetry is enabled if (opts.telemetryEnabled) { startStatsCollection(); @@ -321,6 +368,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { flush(); return client; } catch (error) { + telemetryReporter.stop(); webrtcManager?.cleanup(); audioStreamManager?.cleanup(); throw error; diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index 118b167..908b76f 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -205,7 +205,8 @@ export class WebRTCConnection { try { // Handle messages that don't require peer connection first if (msg.type === "error") { - const error = new Error(msg.error); + const error = new Error(msg.error) as Error & { source?: string }; + error.source = "server"; this.callbacks.onError?.(error); if (this.connectionReject) { this.connectionReject(error); diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index e11c980..5ebdadf 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -148,7 +148,7 @@ export class WebRTCManager { this.logger.warn("Reconnect attempt failed", { error: error.message, attempt: error.attemptNumber }); this.config.onDiagnostic?.("reconnect", { attempt: error.attemptNumber, - maxAttempts: RETRY_OPTIONS.retries, + maxAttempts: RETRY_OPTIONS.retries + 1, durationMs: performance.now() - reconnectStart, success: false, error: error.message, @@ -166,7 +166,7 @@ export class WebRTCManager { ); this.config.onDiagnostic?.("reconnect", { attempt: attemptCount, - maxAttempts: RETRY_OPTIONS.retries, + maxAttempts: RETRY_OPTIONS.retries + 1, durationMs: performance.now() - reconnectStart, success: true, }); diff --git a/packages/sdk/src/utils/errors.ts b/packages/sdk/src/utils/errors.ts index 678f451..a8cecd0 100644 --- a/packages/sdk/src/utils/errors.ts +++ b/packages/sdk/src/utils/errors.ts @@ -53,14 +53,12 @@ export function createWebrtcIceError(error: Error): DecartSDKError { return createSDKError(ERROR_CODES.WEBRTC_ICE_ERROR, "ICE connection failed", undefined, error); } -export function createWebrtcTimeoutError(phase: string, timeoutMs: number, cause?: Error): DecartSDKError { +export function createWebrtcTimeoutError(phase: string, timeoutMs?: number, cause?: Error): DecartSDKError { + const hasTimeout = typeof timeoutMs === "number" && Number.isFinite(timeoutMs); return createSDKError( ERROR_CODES.WEBRTC_TIMEOUT_ERROR, - `${phase} timed out after ${timeoutMs}ms`, - { - phase, - timeoutMs, - }, + hasTimeout ? `${phase} timed out after ${timeoutMs}ms` : `${phase} timed out`, + hasTimeout ? { phase, timeoutMs } : { phase }, cause, ); } @@ -78,6 +76,12 @@ export function createWebrtcSignalingError(error: Error): DecartSDKError { */ export function classifyWebrtcError(error: Error): DecartSDKError { const msg = error.message.toLowerCase(); + const source = (error as Error & { source?: string }).source; + + if (source === "server") { + return createWebrtcServerError(error.message); + } + if (msg.includes("websocket")) { return createWebrtcWebsocketError(error); } @@ -85,7 +89,9 @@ export function classifyWebrtcError(error: Error): DecartSDKError { return createWebrtcIceError(error); } if (msg.includes("timeout") || msg.includes("timed out")) { - return createWebrtcTimeoutError("connection", 0, error); + const timeoutMatch = msg.match(/(\d+)\s*ms/); + const timeoutMs = timeoutMatch ? Number.parseInt(timeoutMatch[1], 10) : undefined; + return createWebrtcTimeoutError("connection", timeoutMs, error); } // Default to signaling error for unclassified WebRTC errors return createWebrtcSignalingError(error); diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index 17817f9..c1cbde6 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1522,6 +1522,236 @@ describe("Subscribe Client", () => { } }); + it("buffers pre-session telemetry diagnostics and flushes them after session_id", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + const sessionIdListeners = new Set<(msg: import("../src/realtime/types").SessionIdMessage) => void>(); + const websocketEmitter = { + on: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.add(listener); + }, + off: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.delete(listener); + }, + }; + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { + onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void; + onDiagnostic?: (name: string, data: unknown) => void; + }; + managerState: import("../src/realtime/types").ConnectionState; + }; + + mgr.config.onDiagnostic?.("phaseTiming", { + phase: "websocket", + durationMs: 12, + success: true, + }); + + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const emitterSpy = vi + .spyOn(WebRTCManager.prototype, "getWebsocketMessageEmitter") + .mockReturnValue(websocketEmitter as never); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + expect(fetchMock).not.toHaveBeenCalled(); + + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-telemetry", + server_ip: "10.0.0.5", + server_port: 9090, + }); + } + + client.disconnect(); + + await vi.waitFor(() => { + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + const [, options] = fetchMock.mock.calls[0]; + const body = JSON.parse(options.body); + expect(body.diagnostics).toHaveLength(1); + expect(body.diagnostics[0].name).toBe("phaseTiming"); + expect(body.diagnostics[0].data.phase).toBe("websocket"); + } finally { + connectSpy.mockRestore(); + stateSpy.mockRestore(); + emitterSpy.mockRestore(); + cleanupSpy.mockRestore(); + vi.unstubAllGlobals(); + } + }); + + it("stops previous telemetry reporter when session_id changes", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + + vi.useFakeTimers(); + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + const setIntervalSpy = vi.spyOn(globalThis, "setInterval"); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval"); + + const sessionIdListeners = new Set<(msg: import("../src/realtime/types").SessionIdMessage) => void>(); + const websocketEmitter = { + on: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.add(listener); + }, + off: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.delete(listener); + }, + }; + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void }; + managerState: import("../src/realtime/types").ConnectionState; + }; + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const emitterSpy = vi + .spyOn(WebRTCManager.prototype, "getWebsocketMessageEmitter") + .mockReturnValue(websocketEmitter as never); + const peerConnectionSpy = vi.spyOn(WebRTCManager.prototype, "getPeerConnection").mockReturnValue(null); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-1", + server_ip: "10.0.0.5", + server_port: 9090, + }); + } + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-2", + server_ip: "10.0.0.6", + server_port: 9091, + }); + } + + client.disconnect(); + + expect(setIntervalSpy).toHaveBeenCalledTimes(2); + expect(clearIntervalSpy).toHaveBeenCalledTimes(2); + } finally { + connectSpy.mockRestore(); + stateSpy.mockRestore(); + emitterSpy.mockRestore(); + peerConnectionSpy.mockRestore(); + cleanupSpy.mockRestore(); + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("restarts stats collection when peer connection changes after reconnect", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + const firstPeerConnection = {} as RTCPeerConnection; + const secondPeerConnection = {} as RTCPeerConnection; + let currentPeerConnection = firstPeerConnection; + let onConnectionStateChange: ((state: import("../src/realtime/types").ConnectionState) => void) | undefined; + + const startSpy = vi.spyOn(WebRTCStatsCollector.prototype, "start").mockImplementation(() => {}); + const stopSpy = vi.spyOn(WebRTCStatsCollector.prototype, "stop").mockImplementation(() => {}); + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void }; + managerState: import("../src/realtime/types").ConnectionState; + }; + onConnectionStateChange = mgr.config.onConnectionStateChange; + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const peerConnectionSpy = vi + .spyOn(WebRTCManager.prototype, "getPeerConnection") + .mockImplementation(() => currentPeerConnection); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + expect(startSpy).toHaveBeenCalledTimes(1); + expect(startSpy.mock.calls[0][0]).toBe(firstPeerConnection); + + currentPeerConnection = secondPeerConnection; + onConnectionStateChange?.("connected"); + + expect(startSpy).toHaveBeenCalledTimes(2); + expect(startSpy.mock.calls[1][0]).toBe(secondPeerConnection); + + client.disconnect(); + expect(stopSpy).toHaveBeenCalled(); + } finally { + startSpy.mockRestore(); + stopSpy.mockRestore(); + connectSpy.mockRestore(); + stateSpy.mockRestore(); + peerConnectionSpy.mockRestore(); + cleanupSpy.mockRestore(); + } + }); + it("subscribe client buffers events until returned", async () => { const { encodeSubscribeToken } = await import("../src/realtime/subscribe-client.js"); const { createRealTimeClient } = await import("../src/realtime/client.js"); @@ -1674,6 +1904,17 @@ describe("WebRTC Error Classification", () => { const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); const result = classifyWebrtcError(new Error("Connection timed out")); expect(result.code).toBe(ERROR_CODES.WEBRTC_TIMEOUT_ERROR); + expect(result.message).toBe("connection timed out"); + expect(result.data).toEqual({ phase: "connection" }); + }); + + it("classifies server-originated errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const error = new Error("Insufficient credits") as Error & { source?: string }; + error.source = "server"; + const result = classifyWebrtcError(error); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SERVER_ERROR); + expect(result.message).toBe("Insufficient credits"); }); it("classifies unknown errors as signaling errors", async () => { @@ -1939,7 +2180,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "test-key", sessionId: "sess-1", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -1995,7 +2235,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "test-key", sessionId: "sess-1", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2024,7 +2263,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "test-key", sessionId: "sess-2", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2057,7 +2295,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "test-key", sessionId: "sess-3", logger: { debug() {}, info() {}, warn() {}, error() {} }, @@ -2086,7 +2323,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "my-api-key", sessionId: "sess-4", integration: "test-integration", @@ -2124,7 +2360,6 @@ describe("TelemetryReporter", () => { try { const reporter = new TelemetryReporter({ - apiKey: "test-key", sessionId: "sess-5", logger: { debug() {}, info() {}, warn() {}, error() {} }, From 0920e6f3fd69b8138c820a4143e613cfaed82409 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 18 Feb 2026 19:29:44 +0200 Subject: [PATCH 10/10] refactor(realtime): remove manual startStats API - remove startStats from RealTimeClient public surface - keep internal auto stats collection for telemetry - stop exporting StatsOptions from package index --- packages/sdk/src/index.ts | 2 +- packages/sdk/src/realtime/client.ts | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index cd187f2..1693fc9 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -45,7 +45,7 @@ export type { SubscribeOptions, } from "./realtime/subscribe-client"; export type { ConnectionState } from "./realtime/types"; -export type { StatsOptions, WebRTCStats } from "./realtime/webrtc-stats"; +export type { WebRTCStats } from "./realtime/webrtc-stats"; export { type ImageModelDefinition, type ImageModels, diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 91a31c8..7a5391a 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -17,7 +17,7 @@ import { import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types"; import { WebRTCManager } from "./webrtc-manager"; -import { type StatsOptions, type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; +import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; async function blobToBase64(blob: Blob): Promise { return new Promise((resolve, reject) => { @@ -119,8 +119,6 @@ export type RealTimeClient = { options?: { prompt?: string; enhance?: boolean; timeout?: number }, ) => Promise; playAudio?: (audio: Blob | File | ArrayBuffer) => Promise; - /** Start collecting WebRTC stats. Stats are emitted via the 'stats' event. Returns a stop function. */ - startStats: (options?: StatsOptions) => () => void; }; export const createRealTimeClient = (opts: RealTimeClientOptions) => { @@ -270,11 +268,11 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let videoStalled = false; let stallStartMs = 0; - const startStatsCollection = (statsOptions?: StatsOptions): (() => void) => { + const startStatsCollection = (): (() => void) => { statsCollector?.stop(); videoStalled = false; stallStartMs = 0; - statsCollector = new WebRTCStatsCollector(statsOptions); + statsCollector = new WebRTCStatsCollector(); const pc = manager.getPeerConnection(); statsCollectorPeerConnection = pc; if (pc) { @@ -356,7 +354,6 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const base64 = await imageToBase64(image); return manager.setImage(base64, options); }, - startStats: startStatsCollection, }; // Add live_avatar specific audio method (only when using internal AudioStreamManager)