-
Notifications
You must be signed in to change notification settings - Fork 2
feat(realtime): WebRTC observability - diagnostics, stats & telemetry #85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
82d4e86
88b266a
701ea82
86beeea
9a22e80
9c74bd6
52e83f9
261862b
34c262a
5fba910
0920e6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| import { z } from "zod"; | ||
| import { modelDefinitionSchema, type RealTimeModels } from "../shared/model"; | ||
| import { modelStateSchema } from "../shared/types"; | ||
| import { createWebrtcError, type DecartSDKError } from "../utils/errors"; | ||
| import { classifyWebrtcError, type DecartSDKError } from "../utils/errors"; | ||
| import type { Logger } from "../utils/logger"; | ||
| import { AudioStreamManager } from "./audio-stream-manager"; | ||
| import type { DiagnosticEvent } from "./diagnostics"; | ||
| import { createEventBuffer } from "./event-buffer"; | ||
| import { realtimeMethods, type SetInput } from "./methods"; | ||
| import { | ||
|
|
@@ -12,8 +14,10 @@ import { | |
| type SubscribeEvents, | ||
| type SubscribeOptions, | ||
| } from "./subscribe-client"; | ||
| import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; | ||
| import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types"; | ||
| import { WebRTCManager } from "./webrtc-manager"; | ||
| import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; | ||
|
|
||
| async function blobToBase64(blob: Blob): Promise<string> { | ||
| return new Promise((resolve, reject) => { | ||
|
|
@@ -69,6 +73,8 @@ export type RealTimeClientOptions = { | |
| baseUrl: string; | ||
| apiKey: string; | ||
| integration?: string; | ||
| logger: Logger; | ||
| telemetryEnabled: boolean; | ||
| }; | ||
|
|
||
| const realTimeClientInitialStateSchema = modelStateSchema; | ||
|
|
@@ -94,6 +100,8 @@ export type Events = { | |
| connectionChange: ConnectionState; | ||
| error: DecartSDKError; | ||
| generationTick: { seconds: number }; | ||
| diagnostic: DiagnosticEvent; | ||
| stats: WebRTCStats; | ||
| }; | ||
|
|
||
| export type RealTimeClient = { | ||
|
|
@@ -114,7 +122,7 @@ export type RealTimeClient = { | |
| }; | ||
|
|
||
| export const createRealTimeClient = (opts: RealTimeClientOptions) => { | ||
| const { baseUrl, apiKey, integration } = opts; | ||
| const { baseUrl, apiKey, integration, logger } = opts; | ||
|
|
||
| const connect = async ( | ||
| stream: MediaStream | null, | ||
|
|
@@ -142,6 +150,8 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
| } | ||
|
|
||
| let webrtcManager: WebRTCManager | undefined; | ||
| let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); | ||
| let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null; | ||
|
|
||
| try { | ||
| // Prepare initial image base64 before connection | ||
|
|
@@ -162,13 +172,19 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
| webrtcManager = new WebRTCManager({ | ||
| webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`, | ||
| integration, | ||
| logger, | ||
| onDiagnostic: (name, data) => { | ||
| emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]); | ||
| addTelemetryDiagnostic(name, data); | ||
| }, | ||
| onRemoteStream, | ||
| onConnectionStateChange: (state) => { | ||
| emitOrBuffer("connectionChange", state); | ||
| handleConnectionStateChange?.(state); | ||
| }, | ||
| onError: (error) => { | ||
| console.error("WebRTC error:", error); | ||
| emitOrBuffer("error", createWebrtcError(error)); | ||
| logger.error("WebRTC error", { error: error.message }); | ||
| emitOrBuffer("error", classifyWebrtcError(error)); | ||
| }, | ||
| customizeOffer: options.customizeOffer as ((offer: RTCSessionDescriptionInit) => Promise<void>) | undefined, | ||
| vp8MinBitrate: 300, | ||
|
|
@@ -182,9 +198,56 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
|
|
||
| let sessionId: string | null = null; | ||
| let subscribeToken: string | null = null; | ||
| const pendingTelemetryDiagnostics: Array<{ | ||
| name: DiagnosticEvent["name"]; | ||
| data: DiagnosticEvent["data"]; | ||
| timestamp: number; | ||
| }> = []; | ||
| let telemetryReporterReady = false; | ||
|
|
||
| const addTelemetryDiagnostic = ( | ||
| name: DiagnosticEvent["name"], | ||
| data: DiagnosticEvent["data"], | ||
| timestamp: number = Date.now(), | ||
| ): void => { | ||
| if (!opts.telemetryEnabled) { | ||
| return; | ||
| } | ||
|
|
||
| if (!telemetryReporterReady) { | ||
| pendingTelemetryDiagnostics.push({ name, data, timestamp }); | ||
| return; | ||
| } | ||
|
|
||
| telemetryReporter.addDiagnostic({ name, data, timestamp }); | ||
| }; | ||
|
|
||
| const sessionIdListener = (msg: SessionIdMessage) => { | ||
| subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port); | ||
| sessionId = msg.session_id; | ||
|
|
||
| // Start telemetry reporter now that we have a session ID | ||
| if (opts.telemetryEnabled) { | ||
| if (telemetryReporterReady) { | ||
| telemetryReporter.stop(); | ||
| } | ||
|
|
||
| const reporter = new TelemetryReporter({ | ||
| apiKey, | ||
| sessionId: msg.session_id, | ||
| model: options.model.name, | ||
| integration, | ||
| logger, | ||
| }); | ||
| reporter.start(); | ||
| telemetryReporter = reporter; | ||
| telemetryReporterReady = true; | ||
|
|
||
| for (const diagnostic of pendingTelemetryDiagnostics) { | ||
| telemetryReporter.addDiagnostic(diagnostic); | ||
| } | ||
| pendingTelemetryDiagnostics.length = 0; | ||
| } | ||
| }; | ||
| manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener); | ||
|
|
||
|
|
@@ -197,12 +260,78 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
|
|
||
| const methods = realtimeMethods(manager, imageToBase64); | ||
|
|
||
| let statsCollector: WebRTCStatsCollector | null = null; | ||
| let statsCollectorPeerConnection: RTCPeerConnection | null = null; | ||
|
|
||
| // Video stall detection state (Twilio pattern: fps < 0.5 = stalled) | ||
| const STALL_FPS_THRESHOLD = 0.5; | ||
| let videoStalled = false; | ||
| let stallStartMs = 0; | ||
|
|
||
| const startStatsCollection = (): (() => void) => { | ||
| statsCollector?.stop(); | ||
| videoStalled = false; | ||
| stallStartMs = 0; | ||
| statsCollector = new WebRTCStatsCollector(); | ||
| const pc = manager.getPeerConnection(); | ||
| statsCollectorPeerConnection = pc; | ||
| if (pc) { | ||
| statsCollector.start(pc, (stats) => { | ||
| emitOrBuffer("stats", stats); | ||
| telemetryReporter.addStats(stats); | ||
|
|
||
| // Stall detection: check if video fps dropped below threshold | ||
| const fps = stats.video?.framesPerSecond ?? 0; | ||
| if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) { | ||
| videoStalled = true; | ||
| stallStartMs = Date.now(); | ||
| emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); | ||
| addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs); | ||
| } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { | ||
| const durationMs = Date.now() - stallStartMs; | ||
| videoStalled = false; | ||
| emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); | ||
| addTelemetryDiagnostic("videoStall", { stalled: false, durationMs }); | ||
| } | ||
| }); | ||
| } | ||
| return () => { | ||
| statsCollector?.stop(); | ||
| statsCollector = null; | ||
| statsCollectorPeerConnection = null; | ||
| }; | ||
| }; | ||
|
|
||
| handleConnectionStateChange = (state) => { | ||
| if (!opts.telemetryEnabled) { | ||
| return; | ||
| } | ||
|
|
||
| if (state !== "connected" && state !== "generating") { | ||
| return; | ||
| } | ||
|
|
||
| const peerConnection = manager.getPeerConnection(); | ||
| if (!peerConnection || peerConnection === statsCollectorPeerConnection) { | ||
| return; | ||
| } | ||
|
|
||
| startStatsCollection(); | ||
| }; | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Auto-start stats when telemetry is enabled | ||
| if (opts.telemetryEnabled) { | ||
| startStatsCollection(); | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stats events never fire when telemetry is disabledMedium Severity The Additional Locations (1) |
||
|
|
||
| const client: RealTimeClient = { | ||
| set: methods.set, | ||
| setPrompt: methods.setPrompt, | ||
| isConnected: () => manager.isConnected(), | ||
| getConnectionState: () => manager.getConnectionState(), | ||
| disconnect: () => { | ||
| statsCollector?.stop(); | ||
| telemetryReporter.stop(); | ||
| stop(); | ||
| manager.cleanup(); | ||
| audioStreamManager?.cleanup(); | ||
|
|
@@ -236,6 +365,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
| flush(); | ||
| return client; | ||
| } catch (error) { | ||
| telemetryReporter.stop(); | ||
| webrtcManager?.cleanup(); | ||
| audioStreamManager?.cleanup(); | ||
| throw error; | ||
|
|
@@ -254,13 +384,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { | |
| webrtcManager = new WebRTCManager({ | ||
| webrtcUrl: subscribeUrl, | ||
| integration, | ||
| logger, | ||
| onDiagnostic: (name, data) => { | ||
| emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]); | ||
| }, | ||
| onRemoteStream: options.onRemoteStream, | ||
| onConnectionStateChange: (state) => { | ||
| emitOrBuffer("connectionChange", state); | ||
| }, | ||
| onError: (error) => { | ||
| console.error("WebRTC subscribe error:", error); | ||
| emitOrBuffer("error", createWebrtcError(error)); | ||
| logger.error("WebRTC subscribe error", { error: error.message }); | ||
| emitOrBuffer("error", classifyWebrtcError(error)); | ||
| }, | ||
| }); | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.