Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@observertc/observer-js",
"version": "1.0.0-beta.1",
"version": "1.0.0",
"description": "Server Side NodeJS Library for processing ObserveRTC Samples",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
70 changes: 56 additions & 14 deletions src/ObservedCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import { Updater } from './updaters/Updater';
import { OnIntervalUpdater } from './updaters/OnIntervalUpdater';
import { OnAnyClientCallUpdater } from './updaters/OnAnyClientCallUpdater';
import { ObservedCallEventMonitor } from './ObservedCallEventMonitor';
import { createLogger } from './common/logger';

const logger = createLogger('ObservedCall');

export type ObservedCallSettings<AppData extends Record<string, unknown> = Record<string, unknown>> = {
callId: string;
appData?: AppData;
remoteTrackResolvePolicy?: 'p2p' | 'mediasoup-sfu',
updatePolicy?: 'update-on-any-client-updated' | 'update-when-all-client-updated' | 'update-on-interval',
updateIntervalInMs?: number,
maxIdleIfEmptyMs?: number,
};

export type ObservedCallEvents = {
Expand Down Expand Up @@ -75,6 +79,20 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
public maxNumberOfClients = 0;
public deltaNumberOfIssues = 0;

public deltaDataChannelBytesReceived = 0;
public deltaDataChannelBytesSent = 0;
public deltaDataChannelMessagesReceived = 0;
public deltaDataChannelMessagesSent = 0;
public deltaInboundPacketsLost = 0;
public deltaInboundPacketsReceived = 0;
public deltaOutboundPacketsSent = 0;
public deltaReceivedAudioBytes = 0;
public deltaReceivedVideoBytes = 0;
public deltaSentAudioBytes = 0;
public deltaSentVideoBytes = 0;
public deltaTransportReceivedBytes = 0;
public deltaTransportSentBytes = 0;

public deltaRttLt50Measurements = 0;
public deltaRttLt150Measurements = 0;
public deltaRttLt300Measurements = 0;
Expand All @@ -85,14 +103,10 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
public startedAt?: number;
public endedAt?: number;
public closedAt?: number;
public lastClientTimestamp = 0;

private _callStartedEvent: {
emitted: boolean,
timer?: ReturnType<typeof setTimeout>,
};
private _callEndedEvent: {
emitted: boolean
};
public maxIdleIfEmptyMs?: number;
private _autoCloseTimer?: ReturnType<typeof setTimeout>;

public constructor(
settings: ObservedCallSettings<AppData>,
Expand All @@ -104,6 +118,7 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
this.callId = settings.callId;
this.appData = settings.appData ?? {} as AppData;
this.scoreCalculator = new DefaultCallScoreCalculator(this);
this.maxIdleIfEmptyMs = settings.maxIdleIfEmptyMs;
this.detectors = new Detectors();

if (settings.updateIntervalInMs) {
Expand Down Expand Up @@ -133,13 +148,6 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
case 'mediasoup-sfu':
break;
}

this._callStartedEvent = {
emitted: false,
};
this._callEndedEvent = {
emitted: false,
};
}

public get numberOfClients() {
Expand Down Expand Up @@ -198,6 +206,8 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin

if (this.observedClients.size === 0) {
this.emit('empty');

this._createAutoClose();
}
++this.totalRemovedClients;
});
Expand All @@ -213,6 +223,9 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin

if (wasEmpty) {
this.emit('not-empty');

clearTimeout(this._autoCloseTimer);
this._autoCloseTimer = undefined;
}

return result;
Expand Down Expand Up @@ -240,13 +253,29 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
this.detectors.update();
this.scoreCalculator.update();

this.lastClientTimestamp = Math.max(...Array.from(this.observedClients.values()).map((c) => c.lastSampleTimestamp ?? 0));

this.emit('update');

this.deltaNumberOfIssues = 0;
this.deltaRttLt50Measurements = 0;
this.deltaRttLt150Measurements = 0;
this.deltaRttLt300Measurements = 0;
this.deltaRttGtOrEq300Measurements = 0;

this.deltaDataChannelBytesReceived = 0;
this.deltaDataChannelBytesSent = 0;
this.deltaDataChannelMessagesReceived = 0;
this.deltaDataChannelMessagesSent = 0;
this.deltaInboundPacketsLost = 0;
this.deltaInboundPacketsReceived = 0;
this.deltaOutboundPacketsSent = 0;
this.deltaReceivedAudioBytes = 0;
this.deltaReceivedVideoBytes = 0;
this.deltaSentAudioBytes = 0;
this.deltaSentVideoBytes = 0;
this.deltaTransportReceivedBytes = 0;
this.deltaTransportSentBytes = 0;
}

private _onClientUpdate(client: ObservedClient) {
Expand Down Expand Up @@ -289,6 +318,19 @@ export class ObservedCall<AppData extends Record<string, unknown> = Record<strin
this.endedAt = Math.max(this.endedAt ?? client.leftAt, client.leftAt);
}

private _createAutoClose() {
if (this.closed) return;
if (this.maxIdleIfEmptyMs === undefined || this.maxIdleIfEmptyMs < 1) return;

this._autoCloseTimer = setTimeout(() => {
if (this.closed) return;
if (this.observedClients.size > 0) return;

logger.debug(`Call ${this.callId} is empty for ${this.maxIdleIfEmptyMs}ms, closing...`);
this.close();
}, this.maxIdleIfEmptyMs);
}

// public resetSummaryMetrics() {
// this.totalAddedClients = 0;
// this.totalRemovedClients = 0;
Expand Down
28 changes: 14 additions & 14 deletions src/ObservedCallEventMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { ObservedCall } from './ObservedCall';
import { ObservedCertificate } from './ObservedCertificate';
import { ObservedCertificate } from './webrtc/ObservedCertificate';
import { ObservedClient, ObservedClientEvents } from './ObservedClient';
import { ObservedCodec } from './ObservedCodec';
import { ObservedDataChannel } from './ObservedDataChannel';
import { ObservedIceCandidate } from './ObservedIceCandidate';
import { ObservedIceCandidatePair } from './ObservedIceCandidatePair';
import { ObservedIceTransport } from './ObservedIceTransport';
import { ObservedInboundRtp } from './ObservedInboundRtp';
import { ObservedInboundTrack } from './ObservedInboundTrack';
import { ObservedMediaPlayout } from './ObservedMediaPlayout';
import { ObservedMediaSource } from './ObservedMediaSource';
import { ObservedOutboundRtp } from './ObservedOutboundRtp';
import { ObservedOutboundTrack } from './ObservedOutboundTrack';
import { ObservedPeerConnection } from './ObservedPeerConnection';
import { ObservedCodec } from './webrtc/ObservedCodec';
import { ObservedDataChannel } from './webrtc/ObservedDataChannel';
import { ObservedIceCandidate } from './webrtc/ObservedIceCandidate';
import { ObservedIceCandidatePair } from './webrtc/ObservedIceCandidatePair';
import { ObservedIceTransport } from './webrtc/ObservedIceTransport';
import { ObservedInboundRtp } from './webrtc/ObservedInboundRtp';
import { ObservedInboundTrack } from './webrtc/ObservedInboundTrack';
import { ObservedMediaPlayout } from './webrtc/ObservedMediaPlayout';
import { ObservedMediaSource } from './webrtc/ObservedMediaSource';
import { ObservedOutboundRtp } from './webrtc/ObservedOutboundRtp';
import { ObservedOutboundTrack } from './webrtc/ObservedOutboundTrack';
import { ObservedPeerConnection } from './webrtc/ObservedPeerConnection';
import { ClientEvent, ClientIssue, ClientMetaData, ClientSample, ExtensionStat } from './schema/ClientSample';

export class ObservedCallEventMonitor<Context> {
Expand Down Expand Up @@ -152,7 +152,7 @@ export class ObservedCallEventMonitor<Context> {
const onClientExtensionStats = (extensionStats: ExtensionStat) => this._onClientExtensionStats(observedClient, extensionStats);
const onUsingTurn = (usingTurn: boolean) => this._onUsingTurn(observedClient, usingTurn);
const onUserMediaError = (error: string) => this._onUserMediaError(observedClient, error);
const onClientUpdated = (...args: ObservedClientEvents['update']) => this.onClientUpdated?.(observedClient, args[0].sample, this.context);
const onClientUpdated = (...args: ObservedClientEvents['update']) => this.onClientUpdated?.(observedClient, args[0], this.context);
const onClientEvent = (event: ClientEvent) => this.onClientEvent?.(observedClient, event, this.context);

observedClient.once('close', () => {
Expand Down
69 changes: 62 additions & 7 deletions src/ObservedClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'events';
import { ObservedPeerConnection } from './ObservedPeerConnection';
import { ObservedPeerConnection } from './webrtc/ObservedPeerConnection';
import { createLogger } from './common/logger';
// eslint-disable-next-line camelcase
import { ClientEvent, ClientMetaData, ClientSample, PeerConnectionSample, ClientIssue, ExtensionStat } from './schema/ClientSample';
Expand All @@ -10,11 +10,14 @@ import { ClientMetaTypes } from './schema/ClientMetaTypes';
import { parseJsonAs } from './common/utils';
import { CalculatedScore } from './scores/CalculatedScore';
import { Detectors } from './detectors/Detectors';
import { clear } from 'console';
import { ParsedUserAgent } from './common/types';

const logger = createLogger('ObservedClient');

export type ObservedClientSettings<AppData extends Record<string, unknown> = Record<string, unknown>> = {
clientId: string;
maxIdleTimeMs?: number;
appData?: AppData;
};

Expand Down Expand Up @@ -44,14 +47,12 @@ export declare interface ObservedClient {
export class ObservedClient<AppData extends Record<string, unknown> = Record<string, unknown>> extends EventEmitter {
public readonly detectors: Detectors;

public readonly clientId: string;
public readonly observedPeerConnections = new Map<string, ObservedPeerConnection>();
public readonly calculatedScore: CalculatedScore = {
weight: 1,
value: undefined,
};

public appData: AppData;
public attachments?: Record<string, unknown>;

public updated = Date.now();
Expand Down Expand Up @@ -135,16 +136,26 @@ export class ObservedClient<AppData extends Record<string, unknown> = Record<str
public issues: ClientIssue[] = [];

private _injections: Pick<ClientSample, 'clientEvents' | 'clientIssues' | 'extensionStats' | 'attachments' | 'clientMetaItems'> = {};
private _autoCloseTimer?: ReturnType<typeof setTimeout>;

public constructor(settings: ObservedClientSettings<AppData>, public readonly call: ObservedCall) {
public constructor(
public readonly settings: ObservedClientSettings<AppData>,
public readonly call: ObservedCall
) {
super();
this.setMaxListeners(Infinity);
this._createAutoClose();

this.clientId = settings.clientId;
this.appData = settings.appData ?? {} as AppData;

this.detectors = new Detectors();
}

public get clientId() {
return this.settings.clientId;
}

public get appData(): AppData {
return this.settings.appData ?? {} as AppData;
}

public get numberOfPeerConnections() {
return this.observedPeerConnections.size;
Expand Down Expand Up @@ -176,8 +187,24 @@ export class ObservedClient<AppData extends Record<string, unknown> = Record<str
this.emit('close');
}

private _createAutoClose() {
if (this.closed) return;
if (this.settings.maxIdleTimeMs === undefined || this.settings.maxIdleTimeMs < 1) return;

clearTimeout(this._autoCloseTimer);

this._autoCloseTimer = setTimeout(() => {
if (this.closed) return;

logger.debug(`Client ${this.clientId} is idle for ${this.settings.maxIdleTimeMs}ms, closing...`);
this.close();
}, this.settings.maxIdleTimeMs);
}

public accept(sample: ClientSample): void {
if (this.closed) throw new Error(`Client ${this.clientId} is closed`);

this._createAutoClose();

const now = Date.now();
const elapsedInMs = now - this.updated;
Expand Down Expand Up @@ -654,6 +681,34 @@ export class ObservedClient<AppData extends Record<string, unknown> = Record<str
this.operationSystem = parseJsonAs(metadata.payload);
break;
}
case 'USER_AGENT_DATA': {
try {
const userAgentData = parseJsonAs<ParsedUserAgent>(metadata.payload);
if (userAgentData) {
if (this.operationSystem === undefined) {
this.operationSystem = {
name: userAgentData.os.name.toLowerCase(),
version: userAgentData.os.version,
};
}
if (this.engine === undefined) {
this.engine = {
name: userAgentData.engine.name.toLowerCase(),
version: userAgentData.engine.version,
};
}
if (this.browser === undefined) {
this.browser = {
name: userAgentData.browser.name.toLowerCase(),
version: userAgentData.browser.version,
};
}
}

} catch (error) {
logger.warn('Failed to parse USER_AGENT_DATA metadata: %o', error);
}
}
}

this.call.observer.emit('client-metadata', this, metadata);
Expand Down
26 changes: 13 additions & 13 deletions src/ObservedClientEventMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { ObservedCertificate } from './ObservedCertificate';
import { ObservedCertificate } from './webrtc/ObservedCertificate';
import { ObservedClient, ObservedClientEvents } from './ObservedClient';
import { ObservedCodec } from './ObservedCodec';
import { ObservedDataChannel } from './ObservedDataChannel';
import { ObservedIceCandidate } from './ObservedIceCandidate';
import { ObservedIceCandidatePair } from './ObservedIceCandidatePair';
import { ObservedIceTransport } from './ObservedIceTransport';
import { ObservedInboundRtp } from './ObservedInboundRtp';
import { ObservedInboundTrack } from './ObservedInboundTrack';
import { ObservedMediaPlayout } from './ObservedMediaPlayout';
import { ObservedMediaSource } from './ObservedMediaSource';
import { ObservedOutboundRtp } from './ObservedOutboundRtp';
import { ObservedOutboundTrack } from './ObservedOutboundTrack';
import { ObservedPeerConnection } from './ObservedPeerConnection';
import { ObservedCodec } from './webrtc/ObservedCodec';
import { ObservedDataChannel } from './webrtc/ObservedDataChannel';
import { ObservedIceCandidate } from './webrtc/ObservedIceCandidate';
import { ObservedIceCandidatePair } from './webrtc/ObservedIceCandidatePair';
import { ObservedIceTransport } from './webrtc/ObservedIceTransport';
import { ObservedInboundRtp } from './webrtc/ObservedInboundRtp';
import { ObservedInboundTrack } from './webrtc/ObservedInboundTrack';
import { ObservedMediaPlayout } from './webrtc/ObservedMediaPlayout';
import { ObservedMediaSource } from './webrtc/ObservedMediaSource';
import { ObservedOutboundRtp } from './webrtc/ObservedOutboundRtp';
import { ObservedOutboundTrack } from './webrtc/ObservedOutboundTrack';
import { ObservedPeerConnection } from './webrtc/ObservedPeerConnection';
import { ClientIssue, ClientMetaData, ExtensionStat } from './schema/ClientSample';

export class ObservedClientEventMonitor<AppContext, ClientAppData extends Record<string, unknown> = Record<string, unknown>> {
Expand Down
3 changes: 2 additions & 1 deletion src/ObservedTURN.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'stream';
import { ObservedPeerConnection } from './ObservedPeerConnection';
import { ObservedPeerConnection } from './webrtc/ObservedPeerConnection';
import { ObservedTurnServer } from './ObservedTurnServer';
import { createLogger } from './common/logger';

Expand Down Expand Up @@ -34,6 +34,7 @@ export class ObservedTURN extends EventEmitter {
public constructor(
) {
super();
this.setMaxListeners(Infinity);
}

public update() {
Expand Down
4 changes: 2 additions & 2 deletions src/ObservedTurnServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ObservedIceCandidatePair } from './ObservedIceCandidatePair';
import { ObservedPeerConnection } from './ObservedPeerConnection';
import { ObservedIceCandidatePair } from './webrtc/ObservedIceCandidatePair';
import { ObservedPeerConnection } from './webrtc/ObservedPeerConnection';
import { ObservedTURN } from './ObservedTURN';

export class ObservedTurnServer {
Expand Down
4 changes: 0 additions & 4 deletions src/Observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ export class Observer<AppData extends Record<string, unknown> = Record<string, u
return this.observedCalls.size;
}

private _timer?: ReturnType<typeof setInterval>;

public constructor(public readonly config: ObserverConfig<AppData> = {
updatePolicy: 'update-when-all-call-updated',
updateIntervalInMs: undefined,
Expand Down Expand Up @@ -154,8 +152,6 @@ export class Observer<AppData extends Record<string, unknown> = Record<string, u
return logger.debug('Attempted to close twice');
}
this.closed = true;
clearInterval(this._timer);
this._timer = undefined;

this.observedCalls.forEach((call) => call.close());

Expand Down
Loading
Loading