diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 6f2b506..1693fc9 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -5,6 +5,7 @@ import { createRealTimeClient } from "./realtime/client"; import { createTokensClient } from "./tokens/client"; import { readEnv } from "./utils/env"; import { createInvalidApiKeyError, createInvalidBaseUrlError } from "./utils/errors"; +import { type Logger, noopLogger } from "./utils/logger"; export type { ProcessClient } from "./process/client"; export type { FileInput, ProcessOptions, ReactNativeFile } from "./process/types"; @@ -23,6 +24,20 @@ export type { RealTimeClientConnectOptions, RealTimeClientInitialState, } from "./realtime/client"; +export type { + ConnectionPhase, + DiagnosticEvent, + DiagnosticEventName, + DiagnosticEvents, + IceCandidateEvent, + IceStateEvent, + PeerConnectionStateEvent, + PhaseTimingEvent, + ReconnectEvent, + SelectedCandidatePairEvent, + SignalingStateEvent, + VideoStallEvent, +} from "./realtime/diagnostics"; export type { SetInput } from "./realtime/methods"; export type { RealTimeSubscribeClient, @@ -30,6 +45,7 @@ export type { SubscribeOptions, } from "./realtime/subscribe-client"; export type { ConnectionState } from "./realtime/types"; +export type { WebRTCStats } from "./realtime/webrtc-stats"; export { type ImageModelDefinition, type ImageModels, @@ -46,6 +62,7 @@ export { export type { ModelState } from "./shared/types"; export type { CreateTokenResponse, TokensClient } from "./tokens/client"; export { type DecartSDKError, ERROR_CODES } from "./utils/errors"; +export { createConsoleLogger, type Logger, type LogLevel, noopLogger } from "./utils/logger"; // Schema with validation to ensure proxy and apiKey are mutually exclusive // Proxy can be a full URL or a relative path (starts with /) @@ -79,12 +96,16 @@ export type DecartClientOptions = apiKey?: never; baseUrl?: string; integration?: string; + logger?: Logger; + telemetry?: boolean; } | { proxy?: never; apiKey?: string; baseUrl?: string; integration?: string; + logger?: Logger; + telemetry?: boolean; }; /** @@ -153,6 +174,8 @@ export const createDecartClient = (options: DecartClientOptions = {}) => { baseUrl = parsedOptions.data.baseUrl || "https://api.decart.ai"; } const { integration } = parsedOptions.data; + const logger = "logger" in options && options.logger ? options.logger : noopLogger; + const telemetryEnabled = "telemetry" in options && options.telemetry === false ? false : true; // Realtime (WebRTC) always requires direct API access with API key // Proxy mode is only for HTTP endpoints (process, queue, tokens) @@ -162,6 +185,8 @@ export const createDecartClient = (options: DecartClientOptions = {}) => { baseUrl: wsBaseUrl, apiKey: apiKey || "", integration, + logger, + telemetryEnabled, }); const process = createProcessClient({ diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 498dd12..7a5391a 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -1,8 +1,10 @@ import { z } from "zod"; import { modelDefinitionSchema, type RealTimeModels } from "../shared/model"; import { modelStateSchema } from "../shared/types"; -import { createWebrtcError, type DecartSDKError } from "../utils/errors"; +import { classifyWebrtcError, type DecartSDKError } from "../utils/errors"; +import type { Logger } from "../utils/logger"; import { AudioStreamManager } from "./audio-stream-manager"; +import type { DiagnosticEvent } from "./diagnostics"; import { createEventBuffer } from "./event-buffer"; import { realtimeMethods, type SetInput } from "./methods"; import { @@ -12,8 +14,10 @@ import { type SubscribeEvents, type SubscribeOptions, } from "./subscribe-client"; +import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types"; import { WebRTCManager } from "./webrtc-manager"; +import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; async function blobToBase64(blob: Blob): Promise { return new Promise((resolve, reject) => { @@ -69,6 +73,8 @@ export type RealTimeClientOptions = { baseUrl: string; apiKey: string; integration?: string; + logger: Logger; + telemetryEnabled: boolean; }; const realTimeClientInitialStateSchema = modelStateSchema; @@ -94,6 +100,8 @@ export type Events = { connectionChange: ConnectionState; error: DecartSDKError; generationTick: { seconds: number }; + diagnostic: DiagnosticEvent; + stats: WebRTCStats; }; export type RealTimeClient = { @@ -114,7 +122,7 @@ export type RealTimeClient = { }; export const createRealTimeClient = (opts: RealTimeClientOptions) => { - const { baseUrl, apiKey, integration } = opts; + const { baseUrl, apiKey, integration, logger } = opts; const connect = async ( stream: MediaStream | null, @@ -142,6 +150,8 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { } let webrtcManager: WebRTCManager | undefined; + let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); + let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null; try { // Prepare initial image base64 before connection @@ -162,13 +172,19 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { webrtcManager = new WebRTCManager({ webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`, integration, + logger, + onDiagnostic: (name, data) => { + emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]); + addTelemetryDiagnostic(name, data); + }, onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); + handleConnectionStateChange?.(state); }, onError: (error) => { - console.error("WebRTC error:", error); - emitOrBuffer("error", createWebrtcError(error)); + logger.error("WebRTC error", { error: error.message }); + emitOrBuffer("error", classifyWebrtcError(error)); }, customizeOffer: options.customizeOffer as ((offer: RTCSessionDescriptionInit) => Promise) | undefined, vp8MinBitrate: 300, @@ -182,9 +198,56 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let sessionId: string | null = null; let subscribeToken: string | null = null; + const pendingTelemetryDiagnostics: Array<{ + name: DiagnosticEvent["name"]; + data: DiagnosticEvent["data"]; + timestamp: number; + }> = []; + let telemetryReporterReady = false; + + const addTelemetryDiagnostic = ( + name: DiagnosticEvent["name"], + data: DiagnosticEvent["data"], + timestamp: number = Date.now(), + ): void => { + if (!opts.telemetryEnabled) { + return; + } + + if (!telemetryReporterReady) { + pendingTelemetryDiagnostics.push({ name, data, timestamp }); + return; + } + + telemetryReporter.addDiagnostic({ name, data, timestamp }); + }; + const sessionIdListener = (msg: SessionIdMessage) => { subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port); sessionId = msg.session_id; + + // Start telemetry reporter now that we have a session ID + if (opts.telemetryEnabled) { + if (telemetryReporterReady) { + telemetryReporter.stop(); + } + + const reporter = new TelemetryReporter({ + apiKey, + sessionId: msg.session_id, + model: options.model.name, + integration, + logger, + }); + reporter.start(); + telemetryReporter = reporter; + telemetryReporterReady = true; + + for (const diagnostic of pendingTelemetryDiagnostics) { + telemetryReporter.addDiagnostic(diagnostic); + } + pendingTelemetryDiagnostics.length = 0; + } }; manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener); @@ -197,12 +260,78 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const methods = realtimeMethods(manager, imageToBase64); + let statsCollector: WebRTCStatsCollector | null = null; + let statsCollectorPeerConnection: RTCPeerConnection | null = null; + + // Video stall detection state (Twilio pattern: fps < 0.5 = stalled) + const STALL_FPS_THRESHOLD = 0.5; + let videoStalled = false; + let stallStartMs = 0; + + const startStatsCollection = (): (() => void) => { + statsCollector?.stop(); + videoStalled = false; + stallStartMs = 0; + statsCollector = new WebRTCStatsCollector(); + const pc = manager.getPeerConnection(); + statsCollectorPeerConnection = pc; + if (pc) { + statsCollector.start(pc, (stats) => { + emitOrBuffer("stats", stats); + telemetryReporter.addStats(stats); + + // Stall detection: check if video fps dropped below threshold + const fps = stats.video?.framesPerSecond ?? 0; + if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) { + videoStalled = true; + stallStartMs = Date.now(); + emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); + addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs); + } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { + const durationMs = Date.now() - stallStartMs; + videoStalled = false; + emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); + addTelemetryDiagnostic("videoStall", { stalled: false, durationMs }); + } + }); + } + return () => { + statsCollector?.stop(); + statsCollector = null; + statsCollectorPeerConnection = null; + }; + }; + + handleConnectionStateChange = (state) => { + if (!opts.telemetryEnabled) { + return; + } + + if (state !== "connected" && state !== "generating") { + return; + } + + const peerConnection = manager.getPeerConnection(); + if (!peerConnection || peerConnection === statsCollectorPeerConnection) { + return; + } + + startStatsCollection(); + }; + + // Auto-start stats when telemetry is enabled + if (opts.telemetryEnabled) { + startStatsCollection(); + } + const client: RealTimeClient = { set: methods.set, setPrompt: methods.setPrompt, isConnected: () => manager.isConnected(), getConnectionState: () => manager.getConnectionState(), disconnect: () => { + statsCollector?.stop(); + telemetryReporter.stop(); stop(); manager.cleanup(); audioStreamManager?.cleanup(); @@ -236,6 +365,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { flush(); return client; } catch (error) { + telemetryReporter.stop(); webrtcManager?.cleanup(); audioStreamManager?.cleanup(); throw error; @@ -254,13 +384,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { webrtcManager = new WebRTCManager({ webrtcUrl: subscribeUrl, integration, + logger, + onDiagnostic: (name, data) => { + emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]); + }, onRemoteStream: options.onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); }, onError: (error) => { - console.error("WebRTC subscribe error:", error); - emitOrBuffer("error", createWebrtcError(error)); + logger.error("WebRTC subscribe error", { error: error.message }); + emitOrBuffer("error", classifyWebrtcError(error)); }, }); diff --git a/packages/sdk/src/realtime/diagnostics.ts b/packages/sdk/src/realtime/diagnostics.ts new file mode 100644 index 0000000..69059d9 --- /dev/null +++ b/packages/sdk/src/realtime/diagnostics.ts @@ -0,0 +1,87 @@ +/** Connection phase names for timing events. */ +export type ConnectionPhase = "websocket" | "avatar-image" | "initial-prompt" | "webrtc-handshake" | "total"; + +export type PhaseTimingEvent = { + phase: ConnectionPhase; + durationMs: number; + success: boolean; + error?: string; +}; + +export type IceCandidateEvent = { + source: "local" | "remote"; + candidateType: "host" | "srflx" | "prflx" | "relay" | "unknown"; + protocol: "udp" | "tcp" | "unknown"; + address?: string; + port?: number; +}; + +export type IceStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type PeerConnectionStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type SignalingStateEvent = { + state: string; + previousState: string; + timestampMs: number; +}; + +export type SelectedCandidatePairEvent = { + local: { + candidateType: string; + protocol: string; + address?: string; + port?: number; + }; + remote: { + candidateType: string; + protocol: string; + address?: string; + port?: number; + }; +}; + +export type ReconnectEvent = { + attempt: number; + maxAttempts: number; + durationMs: number; + success: boolean; + error?: string; +}; + +export type VideoStallEvent = { + /** True when a stall is detected, false when recovered. */ + stalled: boolean; + /** Duration of the stall in ms (0 when stall first detected, actual duration on recovery). */ + durationMs: number; +}; + +/** All diagnostic event types keyed by name. */ +export type DiagnosticEvents = { + phaseTiming: PhaseTimingEvent; + iceCandidate: IceCandidateEvent; + iceStateChange: IceStateEvent; + peerConnectionStateChange: PeerConnectionStateEvent; + signalingStateChange: SignalingStateEvent; + selectedCandidatePair: SelectedCandidatePairEvent; + reconnect: ReconnectEvent; + videoStall: VideoStallEvent; +}; + +export type DiagnosticEventName = keyof DiagnosticEvents; + +/** A single diagnostic event with its name and typed data. */ +export type DiagnosticEvent = { + [K in DiagnosticEventName]: { name: K; data: DiagnosticEvents[K] }; +}[DiagnosticEventName]; + +/** Callback for emitting diagnostic events from the connection/manager layers. */ +export type DiagnosticEmitter = (name: K, data: DiagnosticEvents[K]) => void; diff --git a/packages/sdk/src/realtime/subscribe-client.ts b/packages/sdk/src/realtime/subscribe-client.ts index 329871a..6b1370f 100644 --- a/packages/sdk/src/realtime/subscribe-client.ts +++ b/packages/sdk/src/realtime/subscribe-client.ts @@ -1,4 +1,5 @@ import type { DecartSDKError } from "../utils/errors"; +import type { DiagnosticEvent } from "./diagnostics"; import type { ConnectionState } from "./types"; type TokenPayload = { @@ -26,6 +27,7 @@ export function decodeSubscribeToken(token: string): TokenPayload { export type SubscribeEvents = { connectionChange: ConnectionState; error: DecartSDKError; + diagnostic: DiagnosticEvent; }; export type RealTimeSubscribeClient = { diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/telemetry-reporter.ts new file mode 100644 index 0000000..a2a24bd --- /dev/null +++ b/packages/sdk/src/realtime/telemetry-reporter.ts @@ -0,0 +1,178 @@ +import { buildAuthHeaders } from "../shared/request"; +import type { Logger } from "../utils/logger"; +import { VERSION } from "../version"; +import type { WebRTCStats } from "./webrtc-stats"; + +const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds +const TELEMETRY_URL = "https://platform.decart.ai/api/v1/telemetry"; + +/** + * Maximum number of items per array (stats / diagnostics) in a single report. + * Matches the backend Zod schema which enforces `z.array().max(120)`. + */ +const MAX_ITEMS_PER_REPORT = 120; + +type TelemetryDiagnostic = { + name: string; + data: unknown; + timestamp: number; +}; + +type TelemetryReport = { + sessionId: string; + timestamp: number; + sdkVersion: string; + model?: string; + /** Tags that the backend should attach to every Datadog metric/log from this report. */ + tags: Record; + stats: WebRTCStats[]; + diagnostics: TelemetryDiagnostic[]; +}; + +export interface TelemetryReporterOptions { + apiKey: string; + sessionId: string; + model?: string; + integration?: string; + logger: Logger; + reportIntervalMs?: number; +} + +/** Interface for telemetry reporting. Allows substituting a no-op implementation. */ +export interface ITelemetryReporter { + start(): void; + addStats(stats: WebRTCStats): void; + addDiagnostic(event: TelemetryDiagnostic): void; + flush(): void; + stop(): void; +} + +/** No-op reporter that silently discards all data. Used when telemetry is disabled. */ +export class NullTelemetryReporter implements ITelemetryReporter { + start(): void {} + addStats(): void {} + addDiagnostic(): void {} + flush(): void {} + stop(): void {} +} + +export class TelemetryReporter implements ITelemetryReporter { + private apiKey: string; + private sessionId: string; + private model?: string; + private integration?: string; + private logger: Logger; + private reportIntervalMs: number; + private intervalId: ReturnType | null = null; + private statsBuffer: WebRTCStats[] = []; + private diagnosticsBuffer: TelemetryDiagnostic[] = []; + + constructor(options: TelemetryReporterOptions) { + this.apiKey = options.apiKey; + this.sessionId = options.sessionId; + this.model = options.model; + this.integration = options.integration; + this.logger = options.logger; + this.reportIntervalMs = options.reportIntervalMs ?? DEFAULT_REPORT_INTERVAL_MS; + } + + /** Start the periodic reporting timer. */ + start(): void { + if (this.intervalId !== null) return; + this.intervalId = setInterval(() => this.flush(), this.reportIntervalMs); + } + + /** Add a stats snapshot to the buffer. */ + addStats(stats: WebRTCStats): void { + this.statsBuffer.push(stats); + } + + /** Add a diagnostic event to the buffer. */ + addDiagnostic(event: TelemetryDiagnostic): void { + this.diagnosticsBuffer.push(event); + } + + /** Flush buffered data immediately. */ + flush(): void { + this.sendReport(false); + } + + /** Stop the reporter and send a final report with keepalive. */ + stop(): void { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this.sendReport(true); + } + + /** + * Build a single chunk from the front of the buffers, respecting MAX_ITEMS_PER_REPORT. + * Returns null when both buffers are empty. + */ + private createReportChunk(): TelemetryReport | null { + if (this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0) { + return null; + } + + const tags: Record = { + session_id: this.sessionId, + sdk_version: VERSION, + ...(this.model ? { model: this.model } : {}), + ...(this.integration ? { integration: this.integration } : {}), + }; + + return { + sessionId: this.sessionId, + timestamp: Date.now(), + sdkVersion: VERSION, + ...(this.model ? { model: this.model } : {}), + tags, + stats: this.statsBuffer.splice(0, MAX_ITEMS_PER_REPORT), + diagnostics: this.diagnosticsBuffer.splice(0, MAX_ITEMS_PER_REPORT), + }; + } + + private sendReport(keepalive: boolean): void { + if (this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0) { + return; + } + + try { + const headers = buildAuthHeaders({ apiKey: this.apiKey, integration: this.integration }); + const commonHeaders = { + ...headers, + "Content-Type": "application/json", + }; + + // Send as many chunks as needed to drain both buffers. + let chunk = this.createReportChunk(); + while (chunk !== null) { + const isLast = this.statsBuffer.length === 0 && this.diagnosticsBuffer.length === 0; + + fetch(TELEMETRY_URL, { + method: "POST", + headers: commonHeaders, + body: JSON.stringify(chunk), + // Only set keepalive on the very last chunk (if the caller requested it). + keepalive: keepalive && isLast, + }) + .then((response) => { + if (!response.ok) { + this.logger.warn("Telemetry report rejected", { + status: response.status, + statusText: response.statusText, + }); + } + }) + .catch((error) => { + this.logger.debug("Telemetry report failed", { error: String(error) }); + }); + + chunk = this.createReportChunk(); + } + } catch (error) { + this.logger.debug("Telemetry report failed", { error: String(error) }); + } + } +} diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index 000936c..908b76f 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -1,6 +1,8 @@ import mitt from "mitt"; import type { RealTimeModels } from "../shared/model"; +import type { Logger } from "../utils/logger"; import { buildUserAgent } from "../utils/user-agent"; +import type { DiagnosticEmitter, IceCandidateEvent } from "./diagnostics"; import type { ConnectionState, GenerationTickMessage, @@ -25,6 +27,8 @@ interface ConnectionCallbacks { modelName?: RealTimeModels; initialImage?: string; initialPrompt?: { text: string; enhance?: boolean }; + logger?: Logger; + onDiagnostic?: DiagnosticEmitter; } type WsMessageEvents = { @@ -34,14 +38,25 @@ type WsMessageEvents = { generationTick: GenerationTickMessage; }; +const noopDiagnostic: DiagnosticEmitter = () => {}; + export class WebRTCConnection { private pc: RTCPeerConnection | null = null; private ws: WebSocket | null = null; private localStream: MediaStream | null = null; private connectionReject: ((error: Error) => void) | null = null; + private logger: Logger; + private emitDiagnostic: DiagnosticEmitter; state: ConnectionState = "disconnected"; websocketMessagesEmitter = mitt(); - constructor(private callbacks: ConnectionCallbacks = {}) {} + constructor(private callbacks: ConnectionCallbacks = {}) { + this.logger = callbacks.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; + this.emitDiagnostic = callbacks.onDiagnostic ?? noopDiagnostic; + } + + getPeerConnection(): RTCPeerConnection | null { + return this.pc; + } async connect(url: string, localStream: MediaStream | null, timeout: number, integration?: string): Promise { const deadline = Date.now() + timeout; @@ -63,8 +78,10 @@ export class WebRTCConnection { connectAbort.catch(() => {}); this.connectionReject = (error) => rejectConnect(error); + const totalStart = performance.now(); try { // Phase 1: WebSocket setup + const wsStart = performance.now(); await Promise.race([ new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error("WebSocket timeout")), timeout); @@ -72,18 +89,29 @@ export class WebRTCConnection { this.ws.onopen = () => { clearTimeout(timer); + this.emitDiagnostic("phaseTiming", { + phase: "websocket", + durationMs: performance.now() - wsStart, + success: true, + }); resolve(); }; this.ws.onmessage = (e) => { try { this.handleSignalingMessage(JSON.parse(e.data)); } catch (err) { - console.error("[WebRTC] Parse error:", err); + this.logger.error("Signaling message parse error", { error: String(err) }); } }; this.ws.onerror = () => { clearTimeout(timer); const error = new Error("WebSocket error"); + this.emitDiagnostic("phaseTiming", { + phase: "websocket", + durationMs: performance.now() - wsStart, + success: false, + error: error.message, + }); reject(error); rejectConnect(error); }; @@ -100,6 +128,7 @@ export class WebRTCConnection { // Phase 2: Pre-handshake setup (initial image and/or prompt) // connectionReject is already active, so ws.onclose or server errors abort these too if (this.callbacks.initialImage) { + const imageStart = performance.now(); await Promise.race([ this.setImageBase64(this.callbacks.initialImage, { prompt: this.callbacks.initialPrompt?.text, @@ -107,23 +136,52 @@ export class WebRTCConnection { }), connectAbort, ]); + this.emitDiagnostic("phaseTiming", { + phase: "avatar-image", + durationMs: performance.now() - imageStart, + success: true, + }); } else if (this.callbacks.initialPrompt) { + const promptStart = performance.now(); await Promise.race([this.sendInitialPrompt(this.callbacks.initialPrompt), connectAbort]); + this.emitDiagnostic("phaseTiming", { + phase: "initial-prompt", + durationMs: performance.now() - promptStart, + success: true, + }); } // Phase 3: WebRTC handshake + const handshakeStart = performance.now(); await this.setupNewPeerConnection(); await Promise.race([ new Promise((resolve, reject) => { const checkConnection = setInterval(() => { if (this.state === "connected" || this.state === "generating") { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: true, + }); resolve(); } else if (this.state === "disconnected") { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: false, + error: "Connection lost during handshake", + }); reject(new Error("Connection lost during WebRTC handshake")); } else if (Date.now() >= deadline) { clearInterval(checkConnection); + this.emitDiagnostic("phaseTiming", { + phase: "webrtc-handshake", + durationMs: performance.now() - handshakeStart, + success: false, + error: "Timeout", + }); reject(new Error("Connection timeout")); } }, 100); @@ -132,6 +190,12 @@ export class WebRTCConnection { }), connectAbort, ]); + + this.emitDiagnostic("phaseTiming", { + phase: "total", + durationMs: performance.now() - totalStart, + success: true, + }); } finally { this.connectionReject = null; } @@ -141,7 +205,8 @@ export class WebRTCConnection { try { // Handle messages that don't require peer connection first if (msg.type === "error") { - const error = new Error(msg.error); + const error = new Error(msg.error) as Error & { source?: string }; + error.source = "server"; this.callbacks.onError?.(error); if (this.connectionReject) { this.connectionReject(error); @@ -208,7 +273,17 @@ export class WebRTCConnection { }); break; case "ice-candidate": - if (msg.candidate) await this.pc.addIceCandidate(msg.candidate); + if (msg.candidate) { + await this.pc.addIceCandidate(msg.candidate); + this.emitDiagnostic("iceCandidate", { + source: "remote", + candidateType: + (msg.candidate.candidate?.match(/typ (\w+)/)?.[1] as IceCandidateEvent["candidateType"]) ?? "unknown", + protocol: + (msg.candidate.candidate?.match(/udp|tcp/i)?.[0]?.toLowerCase() as IceCandidateEvent["protocol"]) ?? + "unknown", + }); + } break; case "ice-restart": { const turnConfig = msg.turn_config; @@ -219,7 +294,7 @@ export class WebRTCConnection { } } } catch (error) { - console.error("[WebRTC] Error:", error); + this.logger.error("Signaling handler error", { error: String(error) }); this.callbacks.onError?.(error as Error); this.connectionReject?.(error as Error); } @@ -230,7 +305,7 @@ export class WebRTCConnection { this.ws.send(JSON.stringify(message)); return true; } - console.warn("[WebRTC] Message dropped: WebSocket is not open"); + this.logger.warn("Message dropped: WebSocket is not open"); return false; } @@ -376,11 +451,32 @@ export class WebRTCConnection { this.pc.onicecandidate = (e) => { this.send({ type: "ice-candidate", candidate: e.candidate }); + if (e.candidate) { + this.emitDiagnostic("iceCandidate", { + source: "local", + candidateType: (e.candidate.type as IceCandidateEvent["candidateType"]) ?? "unknown", + protocol: (e.candidate.protocol as IceCandidateEvent["protocol"]) ?? "unknown", + address: e.candidate.address ?? undefined, + port: e.candidate.port ?? undefined, + }); + } }; + let prevPcState: string = "new"; this.pc.onconnectionstatechange = () => { if (!this.pc) return; const s = this.pc.connectionState; + this.emitDiagnostic("peerConnectionStateChange", { + state: s, + previousState: prevPcState, + timestampMs: performance.now(), + }); + prevPcState = s; + + if (s === "connected") { + this.emitSelectedCandidatePair(); + } + const nextState = s === "connected" ? "connected" : ["connecting", "new"].includes(s) ? "connecting" : "disconnected"; // Keep "generating" sticky unless the connection is actually lost. @@ -388,16 +484,76 @@ export class WebRTCConnection { this.setState(nextState); }; + let prevIceState: string = "new"; this.pc.oniceconnectionstatechange = () => { - if (this.pc?.iceConnectionState === "failed") { + if (!this.pc) return; + const newIceState = this.pc.iceConnectionState; + this.emitDiagnostic("iceStateChange", { + state: newIceState, + previousState: prevIceState, + timestampMs: performance.now(), + }); + prevIceState = newIceState; + + if (newIceState === "failed") { this.setState("disconnected"); this.callbacks.onError?.(new Error("ICE connection failed")); } }; + let prevSignalingState: string = "stable"; + this.pc.onsignalingstatechange = () => { + if (!this.pc) return; + const newState = this.pc.signalingState; + this.emitDiagnostic("signalingStateChange", { + state: newState, + previousState: prevSignalingState, + timestampMs: performance.now(), + }); + prevSignalingState = newState; + }; + this.handleSignalingMessage({ type: "ready" }); } + private async emitSelectedCandidatePair(): Promise { + if (!this.pc) return; + try { + const stats = await this.pc.getStats(); + let found = false; + stats.forEach((report) => { + if (found) return; + if (report.type === "candidate-pair" && report.state === "succeeded") { + found = true; + let localCandidate: Record | undefined; + let remoteCandidate: Record | undefined; + stats.forEach((r) => { + if (r.id === report.localCandidateId) localCandidate = r as Record; + if (r.id === report.remoteCandidateId) remoteCandidate = r as Record; + }); + if (localCandidate && remoteCandidate) { + this.emitDiagnostic("selectedCandidatePair", { + local: { + candidateType: String(localCandidate.candidateType ?? "unknown"), + protocol: String(localCandidate.protocol ?? "unknown"), + address: localCandidate.address as string | undefined, + port: localCandidate.port as number | undefined, + }, + remote: { + candidateType: String(remoteCandidate.candidateType ?? "unknown"), + protocol: String(remoteCandidate.protocol ?? "unknown"), + address: remoteCandidate.address as string | undefined, + port: remoteCandidate.port as number | undefined, + }, + }); + } + } + }); + } catch { + // getStats can fail if PC is already closed; silently ignore + } + } + cleanup(): void { // Note: We intentionally do NOT stop the tracks here. // The tracks belong to the user's source stream, not the SDK. @@ -414,7 +570,7 @@ export class WebRTCConnection { applyCodecPreference(preferredCodecName: "video/VP8" | "video/H264") { if (!this.pc) return; if (typeof RTCRtpSender === "undefined" || typeof RTCRtpSender.getCapabilities !== "function") { - console.warn("RTCRtpSender capabilities are not available in this environment."); + this.logger.debug("RTCRtpSender capabilities not available in this environment"); return; } @@ -422,13 +578,13 @@ export class WebRTCConnection { .getTransceivers() .find((r) => r.sender.track?.kind === "video" || r.receiver.track?.kind === "video"); if (!videoTransceiver) { - console.error("Could not find video transceiver. Ensure track is added to peer connection."); + this.logger.warn("Video transceiver not found for codec preference"); return; } const capabilities = RTCRtpSender.getCapabilities("video"); if (!capabilities) { - console.error("Could not get video sender capabilities."); + this.logger.warn("Video sender capabilities unavailable"); return; } @@ -444,13 +600,13 @@ export class WebRTCConnection { const orderedCodecs = [...preferredCodecs, ...otherCodecs]; if (orderedCodecs.length === 0) { - console.warn("No video codecs found to set preferences for."); + this.logger.debug("No video codecs found for preference setting"); return; } try { videoTransceiver.setCodecPreferences(orderedCodecs); } catch { - console.warn("[WebRTC] setCodecPreferences not supported, skipping codec preference."); + this.logger.debug("setCodecPreferences not supported, skipping"); } } diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index b29b276..5ebdadf 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -1,11 +1,15 @@ import pRetry, { AbortError } from "p-retry"; import type { RealTimeModels } from "../shared/model"; +import type { Logger } from "../utils/logger"; +import type { DiagnosticEmitter } from "./diagnostics"; import type { ConnectionState, OutgoingMessage } from "./types"; import { WebRTCConnection } from "./webrtc-connection"; export interface WebRTCConfig { webrtcUrl: string; integration?: string; + logger?: Logger; + onDiagnostic?: DiagnosticEmitter; onRemoteStream: (stream: MediaStream) => void; onConnectionStateChange?: (state: ConnectionState) => void; onError?: (error: Error) => void; @@ -38,6 +42,7 @@ const RETRY_OPTIONS = { export class WebRTCManager { private connection: WebRTCConnection; private config: WebRTCConfig; + private logger: Logger; private localStream: MediaStream | null = null; private subscribeMode = false; private managerState: ConnectionState = "disconnected"; @@ -48,6 +53,7 @@ export class WebRTCManager { constructor(config: WebRTCConfig) { this.config = config; + this.logger = config.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; this.connection = new WebRTCConnection({ onRemoteStream: config.onRemoteStream, onStateChange: (state) => this.handleConnectionStateChange(state), @@ -58,6 +64,8 @@ export class WebRTCManager { modelName: config.modelName, initialImage: config.initialImage, initialPrompt: config.initialPrompt, + logger: this.logger, + onDiagnostic: config.onDiagnostic, }); } @@ -101,10 +109,15 @@ export class WebRTCManager { const reconnectGeneration = ++this.reconnectGeneration; this.isReconnecting = true; this.emitState("reconnecting"); + const reconnectStart = performance.now(); try { + let attemptCount = 0; + await pRetry( async () => { + attemptCount++; + if (this.intentionalDisconnect || reconnectGeneration !== this.reconnectGeneration) { throw new AbortError("Reconnect cancelled"); } @@ -132,7 +145,14 @@ export class WebRTCManager { if (this.intentionalDisconnect || reconnectGeneration !== this.reconnectGeneration) { return; } - console.error(`[WebRTC] Reconnect attempt failed: ${error.message}`); + this.logger.warn("Reconnect attempt failed", { error: error.message, attempt: error.attemptNumber }); + this.config.onDiagnostic?.("reconnect", { + attempt: error.attemptNumber, + maxAttempts: RETRY_OPTIONS.retries + 1, + durationMs: performance.now() - reconnectStart, + success: false, + error: error.message, + }); this.connection.cleanup(); }, shouldRetry: (error) => { @@ -144,6 +164,12 @@ export class WebRTCManager { }, }, ); + this.config.onDiagnostic?.("reconnect", { + attempt: attemptCount, + maxAttempts: RETRY_OPTIONS.retries + 1, + durationMs: performance.now() - reconnectStart, + success: true, + }); // "connected" state is emitted by handleConnectionStateChange } catch (error) { this.isReconnecting = false; @@ -175,7 +201,7 @@ export class WebRTCManager { { ...RETRY_OPTIONS, onFailedAttempt: (error) => { - console.error(`[WebRTC] Failed to connect: ${error.message}`); + this.logger.warn("Connection attempt failed", { error: error.message, attempt: error.attemptNumber }); this.connection.cleanup(); }, shouldRetry: (error) => { @@ -210,6 +236,10 @@ export class WebRTCManager { return this.managerState; } + getPeerConnection(): RTCPeerConnection | null { + return this.connection.getPeerConnection(); + } + getWebsocketMessageEmitter() { return this.connection.websocketMessagesEmitter; } diff --git a/packages/sdk/src/realtime/webrtc-stats.ts b/packages/sdk/src/realtime/webrtc-stats.ts new file mode 100644 index 0000000..42319a4 --- /dev/null +++ b/packages/sdk/src/realtime/webrtc-stats.ts @@ -0,0 +1,233 @@ +export type WebRTCStats = { + timestamp: number; + video: { + framesDecoded: number; + framesDropped: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + freezeCount: number; + totalFreezesDuration: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + /** Delta: frames dropped since previous sample. */ + framesDroppedDelta: number; + /** Delta: freeze count since previous sample. */ + freezeCountDelta: number; + /** Delta: freeze duration (seconds) since previous sample. */ + freezeDurationDelta: number; + } | null; + audio: { + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + } | null; + /** Outbound video track stats (from the local camera/screen share being sent). */ + outboundVideo: { + /** Why the encoder is limiting quality: "none", "bandwidth", "cpu", or "other". */ + qualityLimitationReason: string; + /** Cumulative time (seconds) spent in each quality limitation state. */ + qualityLimitationDurations: Record; + bytesSent: number; + packetsSent: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + /** Estimated outbound bitrate in bits/sec, computed from bytesSent delta. */ + bitrate: number; + } | null; + connection: { + /** Current round-trip time in seconds, or null if unavailable. */ + currentRoundTripTime: number | null; + /** Available outgoing bitrate estimate in bits/sec, or null if unavailable. */ + availableOutgoingBitrate: number | null; + }; +}; + +export type StatsOptions = { + /** Polling interval in milliseconds. Default: 1000. Minimum: 500. */ + intervalMs?: number; +}; + +const DEFAULT_INTERVAL_MS = 1000; +const MIN_INTERVAL_MS = 500; + +export class WebRTCStatsCollector { + private pc: RTCPeerConnection | null = null; + private intervalId: ReturnType | null = null; + private prevBytesVideo = 0; + private prevBytesAudio = 0; + private prevBytesSentVideo = 0; + private prevTimestamp = 0; + // Previous cumulative values for delta computation + private prevPacketsLostVideo = 0; + private prevFramesDropped = 0; + private prevFreezeCount = 0; + private prevFreezeDuration = 0; + private prevPacketsLostAudio = 0; + private onStats: ((stats: WebRTCStats) => void) | null = null; + private intervalMs: number; + + constructor(options: StatsOptions = {}) { + this.intervalMs = Math.max(options.intervalMs ?? DEFAULT_INTERVAL_MS, MIN_INTERVAL_MS); + } + + /** Attach to a peer connection and start polling. */ + start(pc: RTCPeerConnection, onStats: (stats: WebRTCStats) => void): void { + this.stop(); + this.pc = pc; + this.onStats = onStats; + this.prevBytesVideo = 0; + this.prevBytesAudio = 0; + this.prevBytesSentVideo = 0; + this.prevTimestamp = 0; + this.prevPacketsLostVideo = 0; + this.prevFramesDropped = 0; + this.prevFreezeCount = 0; + this.prevFreezeDuration = 0; + this.prevPacketsLostAudio = 0; + this.intervalId = setInterval(() => this.collect(), this.intervalMs); + } + + /** Stop polling and release resources. */ + stop(): void { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this.pc = null; + this.onStats = null; + } + + isRunning(): boolean { + return this.intervalId !== null; + } + + private async collect(): Promise { + if (!this.pc || !this.onStats) return; + + try { + const rawStats = await this.pc.getStats(); + const stats = this.parse(rawStats); + this.onStats(stats); + } catch { + // PC might be closed; stop silently + this.stop(); + } + } + + private parse(rawStats: RTCStatsReport): WebRTCStats { + const now = performance.now(); + const elapsed = this.prevTimestamp > 0 ? (now - this.prevTimestamp) / 1000 : 0; + + let video: WebRTCStats["video"] = null; + let audio: WebRTCStats["audio"] = null; + let outboundVideo: WebRTCStats["outboundVideo"] = null; + const connection: WebRTCStats["connection"] = { + currentRoundTripTime: null, + availableOutgoingBitrate: null, + }; + + rawStats.forEach((report) => { + if (report.type === "inbound-rtp" && report.kind === "video") { + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; + this.prevBytesVideo = bytesReceived; + + const r = report as Record; + const packetsLost = (r.packetsLost as number) ?? 0; + const framesDropped = (r.framesDropped as number) ?? 0; + const freezeCount = (r.freezeCount as number) ?? 0; + const freezeDuration = (r.totalFreezesDuration as number) ?? 0; + + video = { + framesDecoded: (r.framesDecoded as number) ?? 0, + framesDropped, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth: (r.frameWidth as number) ?? 0, + frameHeight: (r.frameHeight as number) ?? 0, + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + freezeCount, + totalFreezesDuration: freezeDuration, + packetsLostDelta: Math.max(0, packetsLost - this.prevPacketsLostVideo), + framesDroppedDelta: Math.max(0, framesDropped - this.prevFramesDropped), + freezeCountDelta: Math.max(0, freezeCount - this.prevFreezeCount), + freezeDurationDelta: Math.max(0, freezeDuration - this.prevFreezeDuration), + }; + this.prevPacketsLostVideo = packetsLost; + this.prevFramesDropped = framesDropped; + this.prevFreezeCount = freezeCount; + this.prevFreezeDuration = freezeDuration; + } + + if (report.type === "outbound-rtp" && report.kind === "video") { + const r = report as Record; + const bytesSent = (r.bytesSent as number) ?? 0; + const outBitrate = elapsed > 0 ? ((bytesSent - this.prevBytesSentVideo) * 8) / elapsed : 0; + this.prevBytesSentVideo = bytesSent; + + outboundVideo = { + qualityLimitationReason: (r.qualityLimitationReason as string) ?? "none", + qualityLimitationDurations: (r.qualityLimitationDurations as Record) ?? {}, + bytesSent, + packetsSent: (r.packetsSent as number) ?? 0, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth: (r.frameWidth as number) ?? 0, + frameHeight: (r.frameHeight as number) ?? 0, + bitrate: Math.round(outBitrate), + }; + } + + if (report.type === "inbound-rtp" && report.kind === "audio") { + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesAudio) * 8) / elapsed : 0; + this.prevBytesAudio = bytesReceived; + + const r = report as Record; + const audioPacketsLost = (r.packetsLost as number) ?? 0; + audio = { + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost: audioPacketsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + packetsLostDelta: Math.max(0, audioPacketsLost - this.prevPacketsLostAudio), + }; + this.prevPacketsLostAudio = audioPacketsLost; + } + + if (report.type === "candidate-pair") { + const r = report as Record; + if (r.state === "succeeded") { + connection.currentRoundTripTime = (r.currentRoundTripTime as number) ?? null; + connection.availableOutgoingBitrate = (r.availableOutgoingBitrate as number) ?? null; + } + } + }); + + this.prevTimestamp = now; + + return { + timestamp: Date.now(), + video, + audio, + outboundVideo, + connection, + }; + } +} diff --git a/packages/sdk/src/utils/errors.ts b/packages/sdk/src/utils/errors.ts index 3171657..a8cecd0 100644 --- a/packages/sdk/src/utils/errors.ts +++ b/packages/sdk/src/utils/errors.ts @@ -8,7 +8,6 @@ export type DecartSDKError = { export const ERROR_CODES = { INVALID_API_KEY: "INVALID_API_KEY", INVALID_BASE_URL: "INVALID_BASE_URL", - WEB_RTC_ERROR: "WEB_RTC_ERROR", PROCESSING_ERROR: "PROCESSING_ERROR", INVALID_INPUT: "INVALID_INPUT", INVALID_OPTIONS: "INVALID_OPTIONS", @@ -18,6 +17,12 @@ export const ERROR_CODES = { QUEUE_RESULT_ERROR: "QUEUE_RESULT_ERROR", JOB_NOT_COMPLETED: "JOB_NOT_COMPLETED", TOKEN_CREATE_ERROR: "TOKEN_CREATE_ERROR", + // WebRTC-specific error codes + WEBRTC_WEBSOCKET_ERROR: "WEBRTC_WEBSOCKET_ERROR", + WEBRTC_ICE_ERROR: "WEBRTC_ICE_ERROR", + WEBRTC_TIMEOUT_ERROR: "WEBRTC_TIMEOUT_ERROR", + WEBRTC_SERVER_ERROR: "WEBRTC_SERVER_ERROR", + WEBRTC_SIGNALING_ERROR: "WEBRTC_SIGNALING_ERROR", } as const; export function createSDKError( @@ -40,10 +45,56 @@ export function createInvalidBaseUrlError(url?: string): DecartSDKError { return createSDKError(ERROR_CODES.INVALID_BASE_URL, `Invalid base URL${url ? `: ${url}` : ""}`); } -export function createWebrtcError(error: Error): DecartSDKError { - return createSDKError(ERROR_CODES.WEB_RTC_ERROR, "WebRTC error", { - cause: error, - }); +export function createWebrtcWebsocketError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_WEBSOCKET_ERROR, "WebSocket connection failed", undefined, error); +} + +export function createWebrtcIceError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_ICE_ERROR, "ICE connection failed", undefined, error); +} + +export function createWebrtcTimeoutError(phase: string, timeoutMs?: number, cause?: Error): DecartSDKError { + const hasTimeout = typeof timeoutMs === "number" && Number.isFinite(timeoutMs); + return createSDKError( + ERROR_CODES.WEBRTC_TIMEOUT_ERROR, + hasTimeout ? `${phase} timed out after ${timeoutMs}ms` : `${phase} timed out`, + hasTimeout ? { phase, timeoutMs } : { phase }, + cause, + ); +} + +export function createWebrtcServerError(message: string): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_SERVER_ERROR, message); +} + +export function createWebrtcSignalingError(error: Error): DecartSDKError { + return createSDKError(ERROR_CODES.WEBRTC_SIGNALING_ERROR, "Signaling error", undefined, error); +} + +/** + * Classify a raw WebRTC error into a specific SDK error based on its message. + */ +export function classifyWebrtcError(error: Error): DecartSDKError { + const msg = error.message.toLowerCase(); + const source = (error as Error & { source?: string }).source; + + if (source === "server") { + return createWebrtcServerError(error.message); + } + + if (msg.includes("websocket")) { + return createWebrtcWebsocketError(error); + } + if (msg.includes("ice connection failed")) { + return createWebrtcIceError(error); + } + if (msg.includes("timeout") || msg.includes("timed out")) { + const timeoutMatch = msg.match(/(\d+)\s*ms/); + const timeoutMs = timeoutMatch ? Number.parseInt(timeoutMatch[1], 10) : undefined; + return createWebrtcTimeoutError("connection", timeoutMs, error); + } + // Default to signaling error for unclassified WebRTC errors + return createWebrtcSignalingError(error); } export function createInvalidInputError(message: string): DecartSDKError { diff --git a/packages/sdk/src/utils/logger.ts b/packages/sdk/src/utils/logger.ts new file mode 100644 index 0000000..e4c8c68 --- /dev/null +++ b/packages/sdk/src/utils/logger.ts @@ -0,0 +1,44 @@ +export type LogLevel = "debug" | "info" | "warn" | "error"; + +export type Logger = { + debug: (message: string, data?: Record) => void; + info: (message: string, data?: Record) => void; + warn: (message: string, data?: Record) => void; + error: (message: string, data?: Record) => void; +}; + +/** A logger that discards all messages. Zero overhead. */ +export const noopLogger: Logger = { + debug() {}, + info() {}, + warn() {}, + error() {}, +}; + +const LOG_LEVELS: Record = { debug: 0, info: 1, warn: 2, error: 3 }; + +/** + * Creates a console-based logger that only logs messages at or above the given level. + * + * @param minLevel - Minimum log level to output. Default: "warn". + */ +export function createConsoleLogger(minLevel: LogLevel = "warn"): Logger { + const threshold = LOG_LEVELS[minLevel]; + + const log = (level: LogLevel, message: string, data?: Record) => { + if (LOG_LEVELS[level] < threshold) return; + const prefix = "[DecartSDK]"; + if (data) { + console[level](prefix, message, data); + } else { + console[level](prefix, message); + } + }; + + return { + debug: (msg, data) => log("debug", msg, data), + info: (msg, data) => log("info", msg, data), + warn: (msg, data) => log("warn", msg, data), + error: (msg, data) => log("error", msg, data), + }; +} diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index eecc208..c1cbde6 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1522,6 +1522,236 @@ describe("Subscribe Client", () => { } }); + it("buffers pre-session telemetry diagnostics and flushes them after session_id", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + const sessionIdListeners = new Set<(msg: import("../src/realtime/types").SessionIdMessage) => void>(); + const websocketEmitter = { + on: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.add(listener); + }, + off: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.delete(listener); + }, + }; + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { + onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void; + onDiagnostic?: (name: string, data: unknown) => void; + }; + managerState: import("../src/realtime/types").ConnectionState; + }; + + mgr.config.onDiagnostic?.("phaseTiming", { + phase: "websocket", + durationMs: 12, + success: true, + }); + + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const emitterSpy = vi + .spyOn(WebRTCManager.prototype, "getWebsocketMessageEmitter") + .mockReturnValue(websocketEmitter as never); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + expect(fetchMock).not.toHaveBeenCalled(); + + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-telemetry", + server_ip: "10.0.0.5", + server_port: 9090, + }); + } + + client.disconnect(); + + await vi.waitFor(() => { + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + const [, options] = fetchMock.mock.calls[0]; + const body = JSON.parse(options.body); + expect(body.diagnostics).toHaveLength(1); + expect(body.diagnostics[0].name).toBe("phaseTiming"); + expect(body.diagnostics[0].data.phase).toBe("websocket"); + } finally { + connectSpy.mockRestore(); + stateSpy.mockRestore(); + emitterSpy.mockRestore(); + cleanupSpy.mockRestore(); + vi.unstubAllGlobals(); + } + }); + + it("stops previous telemetry reporter when session_id changes", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + + vi.useFakeTimers(); + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + const setIntervalSpy = vi.spyOn(globalThis, "setInterval"); + const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval"); + + const sessionIdListeners = new Set<(msg: import("../src/realtime/types").SessionIdMessage) => void>(); + const websocketEmitter = { + on: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.add(listener); + }, + off: (event: string, listener: (msg: import("../src/realtime/types").SessionIdMessage) => void) => { + if (event === "sessionId") sessionIdListeners.delete(listener); + }, + }; + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void }; + managerState: import("../src/realtime/types").ConnectionState; + }; + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const emitterSpy = vi + .spyOn(WebRTCManager.prototype, "getWebsocketMessageEmitter") + .mockReturnValue(websocketEmitter as never); + const peerConnectionSpy = vi.spyOn(WebRTCManager.prototype, "getPeerConnection").mockReturnValue(null); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-1", + server_ip: "10.0.0.5", + server_port: 9090, + }); + } + for (const listener of sessionIdListeners) { + listener({ + type: "session_id", + session_id: "sess-2", + server_ip: "10.0.0.6", + server_port: 9091, + }); + } + + client.disconnect(); + + expect(setIntervalSpy).toHaveBeenCalledTimes(2); + expect(clearIntervalSpy).toHaveBeenCalledTimes(2); + } finally { + connectSpy.mockRestore(); + stateSpy.mockRestore(); + emitterSpy.mockRestore(); + peerConnectionSpy.mockRestore(); + cleanupSpy.mockRestore(); + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("restarts stats collection when peer connection changes after reconnect", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + const firstPeerConnection = {} as RTCPeerConnection; + const secondPeerConnection = {} as RTCPeerConnection; + let currentPeerConnection = firstPeerConnection; + let onConnectionStateChange: ((state: import("../src/realtime/types").ConnectionState) => void) | undefined; + + const startSpy = vi.spyOn(WebRTCStatsCollector.prototype, "start").mockImplementation(() => {}); + const stopSpy = vi.spyOn(WebRTCStatsCollector.prototype, "stop").mockImplementation(() => {}); + + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void }; + managerState: import("../src/realtime/types").ConnectionState; + }; + onConnectionStateChange = mgr.config.onConnectionStateChange; + mgr.managerState = "connected"; + mgr.config.onConnectionStateChange?.("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const peerConnectionSpy = vi + .spyOn(WebRTCManager.prototype, "getPeerConnection") + .mockImplementation(() => currentPeerConnection); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: true, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + expect(startSpy).toHaveBeenCalledTimes(1); + expect(startSpy.mock.calls[0][0]).toBe(firstPeerConnection); + + currentPeerConnection = secondPeerConnection; + onConnectionStateChange?.("connected"); + + expect(startSpy).toHaveBeenCalledTimes(2); + expect(startSpy.mock.calls[1][0]).toBe(secondPeerConnection); + + client.disconnect(); + expect(stopSpy).toHaveBeenCalled(); + } finally { + startSpy.mockRestore(); + stopSpy.mockRestore(); + connectSpy.mockRestore(); + stateSpy.mockRestore(); + peerConnectionSpy.mockRestore(); + cleanupSpy.mockRestore(); + } + }); + it("subscribe client buffers events until returned", async () => { const { encodeSubscribeToken } = await import("../src/realtime/subscribe-client.js"); const { createRealTimeClient } = await import("../src/realtime/client.js"); @@ -1562,6 +1792,741 @@ describe("Subscribe Client", () => { }); }); +describe("Logger", () => { + it("noopLogger does nothing", async () => { + const { noopLogger } = await import("../src/utils/logger.js"); + // Should not throw + noopLogger.debug("test"); + noopLogger.info("test"); + noopLogger.warn("test"); + noopLogger.error("test"); + }); + + it("createConsoleLogger filters by level", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const debugSpy = vi.spyOn(console, "debug").mockImplementation(() => {}); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("warn"); + logger.debug("d"); + logger.info("i"); + logger.warn("w"); + logger.error("e"); + + expect(debugSpy).not.toHaveBeenCalled(); + expect(infoSpy).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(1); + } finally { + debugSpy.mockRestore(); + infoSpy.mockRestore(); + warnSpy.mockRestore(); + errorSpy.mockRestore(); + } + }); + + it("createConsoleLogger at debug level logs everything", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const debugSpy = vi.spyOn(console, "debug").mockImplementation(() => {}); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("debug"); + logger.debug("d"); + logger.info("i"); + logger.warn("w"); + logger.error("e"); + + expect(debugSpy).toHaveBeenCalledTimes(1); + expect(infoSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(1); + } finally { + debugSpy.mockRestore(); + infoSpy.mockRestore(); + warnSpy.mockRestore(); + errorSpy.mockRestore(); + } + }); + + it("createConsoleLogger includes data in log output", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger("warn"); + logger.warn("test message", { key: "value" }); + + expect(warnSpy).toHaveBeenCalledWith("[DecartSDK]", "test message", { key: "value" }); + } finally { + warnSpy.mockRestore(); + } + }); + + it("createConsoleLogger defaults to warn level", async () => { + const { createConsoleLogger } = await import("../src/utils/logger.js"); + const infoSpy = vi.spyOn(console, "info").mockImplementation(() => {}); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + try { + const logger = createConsoleLogger(); + logger.info("should not appear"); + logger.warn("should appear"); + + expect(infoSpy).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + } finally { + infoSpy.mockRestore(); + warnSpy.mockRestore(); + } + }); +}); + +describe("WebRTC Error Classification", () => { + it("classifies websocket errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("WebSocket connection closed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_WEBSOCKET_ERROR); + }); + + it("classifies ICE errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("ICE connection failed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_ICE_ERROR); + }); + + it("classifies timeout errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("Connection timed out")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_TIMEOUT_ERROR); + expect(result.message).toBe("connection timed out"); + expect(result.data).toEqual({ phase: "connection" }); + }); + + it("classifies server-originated errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const error = new Error("Insufficient credits") as Error & { source?: string }; + error.source = "server"; + const result = classifyWebrtcError(error); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SERVER_ERROR); + expect(result.message).toBe("Insufficient credits"); + }); + + it("classifies unknown errors as signaling errors", async () => { + const { classifyWebrtcError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = classifyWebrtcError(new Error("SDP offer failed")); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SIGNALING_ERROR); + }); + + it("createWebrtcTimeoutError includes phase and timeout data", async () => { + const { createWebrtcTimeoutError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = createWebrtcTimeoutError("webrtc-handshake", 30000); + expect(result.code).toBe(ERROR_CODES.WEBRTC_TIMEOUT_ERROR); + expect(result.message).toBe("webrtc-handshake timed out after 30000ms"); + expect(result.data).toEqual({ phase: "webrtc-handshake", timeoutMs: 30000 }); + }); + + it("createWebrtcServerError preserves the message", async () => { + const { createWebrtcServerError, ERROR_CODES } = await import("../src/utils/errors.js"); + const result = createWebrtcServerError("Server overloaded"); + expect(result.code).toBe(ERROR_CODES.WEBRTC_SERVER_ERROR); + expect(result.message).toBe("Server overloaded"); + }); + + it("factory functions preserve the cause error", async () => { + const { createWebrtcWebsocketError } = await import("../src/utils/errors.js"); + const cause = new Error("original"); + const result = createWebrtcWebsocketError(cause); + expect(result.cause).toBe(cause); + }); +}); + +describe("WebRTCStatsCollector", () => { + it("starts and stops polling", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const collector = new WebRTCStatsCollector(); + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + + collector.start(mockPC, onStats); + expect(collector.isRunning()).toBe(true); + + collector.stop(); + expect(collector.isRunning()).toBe(false); + }); + + it("parses inbound video stats", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const videoReport = { + type: "inbound-rtp", + kind: "video", + framesDecoded: 100, + framesDropped: 2, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + bytesReceived: 500000, + packetsReceived: 1000, + packetsLost: 5, + jitter: 0.01, + freezeCount: 0, + totalFreezesDuration: 0, + }; + + const statsReport = new Map([["video-1", videoReport]]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.video).not.toBeNull(); + expect(stats.video?.framesDecoded).toBe(100); + expect(stats.video?.framesDropped).toBe(2); + expect(stats.video?.framesPerSecond).toBe(30); + expect(stats.video?.frameWidth).toBe(1280); + expect(stats.video?.frameHeight).toBe(720); + expect(stats.video?.packetsLost).toBe(5); + expect(stats.audio).toBeNull(); + expect(stats.connection.currentRoundTripTime).toBeNull(); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("parses inbound audio and candidate-pair stats", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const audioReport = { + type: "inbound-rtp", + kind: "audio", + bytesReceived: 10000, + packetsReceived: 200, + packetsLost: 1, + jitter: 0.005, + }; + + const candidatePairReport = { + type: "candidate-pair", + state: "succeeded", + currentRoundTripTime: 0.05, + availableOutgoingBitrate: 2000000, + }; + + const statsReport = new Map([ + ["audio-1", audioReport], + ["cp-1", candidatePairReport], + ]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.audio).not.toBeNull(); + expect(stats.audio?.packetsLost).toBe(1); + expect(stats.connection.currentRoundTripTime).toBe(0.05); + expect(stats.connection.availableOutgoingBitrate).toBe(2000000); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes video bitrate from bytesReceived delta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let bytesReceived = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + bytesReceived += 125000; // 125KB per second = ~1Mbps + return new Map([ + [ + "video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived, + framesDecoded: 0, + framesDropped: 0, + framesPerSecond: 0, + frameWidth: 0, + frameHeight: 0, + packetsReceived: 0, + packetsLost: 0, + jitter: 0, + freezeCount: 0, + totalFreezesDuration: 0, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: no previous data, bitrate = 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.bitrate).toBe(0); + + // Second tick: has delta, should compute bitrate + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.bitrate).toBeGreaterThan(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("enforces minimum interval of 500ms", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 100 }); // Below minimum + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + collector.start(mockPC, onStats); + + // At 100ms, nothing should fire (minimum is 500ms) + await vi.advanceTimersByTimeAsync(100); + expect(onStats).not.toHaveBeenCalled(); + + // At 500ms, it should fire + await vi.advanceTimersByTimeAsync(400); + expect(onStats).toHaveBeenCalledTimes(1); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("stops silently if getStats throws", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const mockPC = { + getStats: vi.fn().mockRejectedValue(new Error("PC closed")), + } as unknown as RTCPeerConnection; + + const onStats = vi.fn(); + collector.start(mockPC, onStats); + + await vi.advanceTimersByTimeAsync(1000); + + expect(onStats).not.toHaveBeenCalled(); + expect(collector.isRunning()).toBe(false); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("TelemetryReporter", () => { + it("buffers stats and diagnostics then flushes on interval", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-1", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + reporter.addDiagnostic({ + name: "phaseTiming", + data: { phase: "total", durationMs: 500, success: true }, + timestamp: 1000, + }); + + // Before interval: no fetch + expect(fetchMock).not.toHaveBeenCalled(); + + // After interval: flush + await vi.advanceTimersByTimeAsync(5000); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [url, options] = fetchMock.mock.calls[0]; + expect(url).toBe("https://platform.decart.ai/api/v1/telemetry"); + expect(options.method).toBe("POST"); + expect(options.keepalive).toBe(false); + + const body = JSON.parse(options.body); + expect(body.sessionId).toBe("sess-1"); + expect(body.stats).toHaveLength(1); + expect(body.diagnostics).toHaveLength(1); + expect(body.diagnostics[0].name).toBe("phaseTiming"); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("does not send empty reports", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-1", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + // No data added — interval fires + await vi.advanceTimersByTimeAsync(5000); + + expect(fetchMock).not.toHaveBeenCalled(); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("stop sends final report with keepalive", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-2", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 2000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.stop(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + const [, options] = fetchMock.mock.calls[0]; + expect(options.keepalive).toBe(true); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("silently handles fetch failures", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockRejectedValue(new Error("network error")); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-3", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 3000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + // Should not throw + reporter.flush(); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("includes auth headers and sdk version in report", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "my-api-key", + sessionId: "sess-4", + integration: "test-integration", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 4000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const [, options] = fetchMock.mock.calls[0]; + expect(options.headers["X-API-KEY"]).toBe("my-api-key"); + expect(options.headers["Content-Type"]).toBe("application/json"); + + const body = JSON.parse(options.body); + expect(body.sdkVersion).toBeDefined(); + expect(typeof body.sdkVersion).toBe("string"); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("clears buffers after sending", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + vi.useFakeTimers(); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-5", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + reportIntervalMs: 5000, + }); + + reporter.start(); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + // First flush + await vi.advanceTimersByTimeAsync(5000); + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Second interval: no new data, should not send + await vi.advanceTimersByTimeAsync(5000); + expect(fetchMock).toHaveBeenCalledTimes(1); + + reporter.stop(); + } finally { + vi.useRealTimers(); + vi.unstubAllGlobals(); + } + }); + + it("chunks reports when buffers exceed 120 items", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-chunk", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + // Add 150 stats (exceeds max of 120) + for (let i = 0; i < 150; i++) { + reporter.addStats({ + timestamp: i, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + } + + reporter.flush(); + + // Should produce 2 requests: 120 + 30 + expect(fetchMock).toHaveBeenCalledTimes(2); + + const body1 = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body1.stats).toHaveLength(120); + + const body2 = JSON.parse(fetchMock.mock.calls[1][1].body); + expect(body2.stats).toHaveLength(30); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("warns on non-2xx response status", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: false, status: 500, statusText: "Internal Server Error" }); + vi.stubGlobal("fetch", fetchMock); + + try { + const warnMock = vi.fn(); + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-warn", + logger: { debug() {}, info() {}, warn: warnMock, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + // Wait for the .then() handler to execute + await vi.waitFor(() => { + expect(warnMock).toHaveBeenCalledTimes(1); + }); + + expect(warnMock).toHaveBeenCalledWith("Telemetry report rejected", { + status: 500, + statusText: "Internal Server Error", + }); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("includes model in report body and tags when provided", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-model", + model: "gemini-3n", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body.model).toBe("gemini-3n"); + expect(body.tags.model).toBe("gemini-3n"); + } finally { + vi.unstubAllGlobals(); + } + }); + + it("omits model from report when not provided", async () => { + const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + + const fetchMock = vi.fn().mockResolvedValue({ ok: true }); + vi.stubGlobal("fetch", fetchMock); + + try { + const reporter = new TelemetryReporter({ + apiKey: "test-key", + sessionId: "sess-no-model", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + }); + + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + + reporter.flush(); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body.model).toBeUndefined(); + expect(body.tags.model).toBeUndefined(); + } finally { + vi.unstubAllGlobals(); + } + }); +}); + describe("WebSockets Connection", () => { it("connect resolves when state becomes generating before poll observes connected", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); @@ -1779,3 +2744,339 @@ describe("WebSockets Connection", () => { } }); }); + +describe("NullTelemetryReporter", () => { + it("all methods are no-ops", async () => { + const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const reporter = new NullTelemetryReporter(); + + // None of these should throw + reporter.start(); + reporter.addStats({ + timestamp: 1000, + video: null, + audio: null, + outboundVideo: null, + connection: { currentRoundTripTime: null, availableOutgoingBitrate: null }, + }); + reporter.addDiagnostic({ name: "phaseTiming", data: {}, timestamp: 1000 }); + reporter.flush(); + reporter.stop(); + }); + + it("implements ITelemetryReporter interface", async () => { + const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const reporter = new NullTelemetryReporter(); + + expect(typeof reporter.start).toBe("function"); + expect(typeof reporter.addStats).toBe("function"); + expect(typeof reporter.addDiagnostic).toBe("function"); + expect(typeof reporter.flush).toBe("function"); + expect(typeof reporter.stop).toBe("function"); + }); +}); + +describe("Outbound Video Stats", () => { + it("parses outbound-rtp video with quality limitation tracking", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const outboundVideoReport = { + type: "outbound-rtp", + kind: "video", + bytesSent: 200000, + packetsSent: 500, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + qualityLimitationReason: "bandwidth", + qualityLimitationDurations: { none: 5.0, bandwidth: 2.5, cpu: 0, other: 0 }, + }; + + const statsReport = new Map([["outbound-video-1", outboundVideoReport]]); + const mockPC = { + getStats: vi.fn().mockResolvedValue(statsReport), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats.length).toBe(1); + const stats = receivedStats[0]; + expect(stats.outboundVideo).not.toBeNull(); + expect(stats.outboundVideo?.qualityLimitationReason).toBe("bandwidth"); + expect(stats.outboundVideo?.qualityLimitationDurations).toEqual({ + none: 5.0, + bandwidth: 2.5, + cpu: 0, + other: 0, + }); + expect(stats.outboundVideo?.framesPerSecond).toBe(30); + expect(stats.outboundVideo?.frameWidth).toBe(1280); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("returns null outboundVideo when no outbound-rtp report", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + const mockPC = { + getStats: vi.fn().mockResolvedValue(new Map()), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + + expect(receivedStats[0].outboundVideo).toBeNull(); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes outbound video bitrate from bytesSent delta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let bytesSent = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + bytesSent += 62500; // 62.5KB per second = ~500kbps + return new Map([ + [ + "outbound-video-1", + { + type: "outbound-rtp", + kind: "video", + bytesSent, + packetsSent: 0, + framesPerSecond: 30, + frameWidth: 640, + frameHeight: 480, + qualityLimitationReason: "none", + qualityLimitationDurations: {}, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: no previous data, bitrate = 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].outboundVideo?.bitrate).toBe(0); + + // Second tick: has delta + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].outboundVideo?.bitrate).toBeGreaterThan(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("Delta computation for cumulative counters", () => { + it("computes packetsLostDelta, framesDroppedDelta, freezeCountDelta, freezeDurationDelta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let packetsLost = 0; + let framesDropped = 0; + let freezeCount = 0; + let totalFreezesDuration = 0; + + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + packetsLost += 3; + framesDropped += 2; + freezeCount += 1; + totalFreezesDuration += 0.5; + return new Map([ + [ + "inbound-video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived: 100000, + packetsReceived: 500, + packetsLost, + framesDecoded: 100, + framesDropped, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + jitter: 0.01, + freezeCount, + totalFreezesDuration, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + // First tick: delta = cumulative (since prev was 0) + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.packetsLostDelta).toBe(3); + expect(receivedStats[0].video?.framesDroppedDelta).toBe(2); + expect(receivedStats[0].video?.freezeCountDelta).toBe(1); + expect(receivedStats[0].video?.freezeDurationDelta).toBe(0.5); + + // Second tick: delta = increment from previous + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.packetsLostDelta).toBe(3); + expect(receivedStats[1].video?.framesDroppedDelta).toBe(2); + expect(receivedStats[1].video?.freezeCountDelta).toBe(1); + expect(receivedStats[1].video?.freezeDurationDelta).toBe(0.5); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("computes audio packetsLostDelta", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let audioPacketsLost = 0; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + audioPacketsLost += 5; + return new Map([ + [ + "inbound-audio-1", + { + type: "inbound-rtp", + kind: "audio", + bytesReceived: 50000, + packetsReceived: 200, + packetsLost: audioPacketsLost, + jitter: 0.02, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].audio?.packetsLostDelta).toBe(5); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].audio?.packetsLostDelta).toBe(5); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); + + it("clamps deltas to zero if cumulative counter resets", async () => { + const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + + vi.useFakeTimers(); + try { + const collector = new WebRTCStatsCollector({ intervalMs: 1000 }); + + let packetsLost = 10; + const mockPC = { + getStats: vi.fn().mockImplementation(async () => { + const current = packetsLost; + // Simulate counter reset on second call + if (mockPC.getStats.mock.calls.length === 2) { + packetsLost = 0; + } + return new Map([ + [ + "inbound-video-1", + { + type: "inbound-rtp", + kind: "video", + bytesReceived: 100000, + packetsReceived: 500, + packetsLost: current, + framesDecoded: 100, + framesDropped: 0, + framesPerSecond: 30, + frameWidth: 1280, + frameHeight: 720, + jitter: 0.01, + freezeCount: 0, + totalFreezesDuration: 0, + }, + ], + ]); + }), + } as unknown as RTCPeerConnection; + + const receivedStats: Array = []; + collector.start(mockPC, (stats) => receivedStats.push(stats)); + + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[0].video?.packetsLostDelta).toBe(10); + + // Counter reset: 0 - 10 = -10, clamped to 0 + await vi.advanceTimersByTimeAsync(1000); + expect(receivedStats[1].video?.packetsLostDelta).toBe(0); + + collector.stop(); + } finally { + vi.useRealTimers(); + } + }); +}); + +describe("VideoStall Diagnostic", () => { + it("videoStall event type exists in DiagnosticEvents", async () => { + // Type-level check: videoStall is a valid DiagnosticEventName + const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + name: "videoStall", + data: { stalled: true, durationMs: 0 }, + }; + expect(event.name).toBe("videoStall"); + expect(event.data.stalled).toBe(true); + expect(event.data.durationMs).toBe(0); + }); + + it("videoStall recovery includes duration", () => { + const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + name: "videoStall", + data: { stalled: false, durationMs: 1500 }, + }; + expect(event.data.stalled).toBe(false); + expect(event.data.durationMs).toBe(1500); + }); +});