From 3718449851b0862d19fbf10f5752b6996574e7d2 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Wed, 28 Feb 2024 21:11:13 -0800 Subject: [PATCH 01/13] Introduce configurable congestion control. The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliance from being inundated with a burst text-to-speech requests that it can't satisfy. Prior to this change, the client would split a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; and therefore, a bad customer experience. This change introduces allows customers to turn on one of a enumerated set of congestion control algorithms. We've implemented just one for now, StaticMar2024, which delays sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text chunk has started streaming. This is a very simple congestion control algorithm with static constants; it leaves a lot to be desired. We should iterate on these algorithms in the future. The CongestionCtrl enum was added so that algorithms can be added without requiring customers to change their code much. [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances running near 100% utilization. --- packages/playht/src/api/APISettingsStore.ts | 1 + packages/playht/src/api/apiCommon.ts | 167 ++++++++++++++++-- packages/playht/src/grpc-client/client.ts | 9 +- .../src/grpc-client/tts-stream-source.ts | 63 ++++++- packages/playht/src/index.ts | 28 +++ 5 files changed, 247 insertions(+), 21 deletions(-) diff --git a/packages/playht/src/api/APISettingsStore.ts b/packages/playht/src/api/APISettingsStore.ts index 66d5fb1..a66a915 100644 --- a/packages/playht/src/api/APISettingsStore.ts +++ b/packages/playht/src/api/APISettingsStore.ts @@ -20,6 +20,7 @@ export class APISettingsStore { apiKey: settings.apiKey, customAddr: settings.customAddr, fallbackEnabled: settings.fallbackEnabled, + congestionCtrl: settings.congestionCtrl, }); APISettingsStore._instance = this; diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index 5c88966..a25a801 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -17,6 +17,7 @@ import { generateV2Speech } from './generateV2Speech'; import { generateV2Stream } from './generateV2Stream'; import { textStreamToSentences } from './textStreamToSentences'; import { generateGRpcStream } from './generateGRpcStream'; +import {CongestionCtrl} from ".."; export type V1ApiOptions = { narrationStyle?: string; @@ -198,8 +199,8 @@ async function audioStreamFromSentences( writableStream: NodeJS.WritableStream, options?: SpeechStreamOptions, ) { - // Create a stream for promises - const promiseStream = new Readable({ + // Create a stream of audio chunk promises -- each corresponding to a sentence + const audioChunkStream = new Readable({ objectMode: true, read() {}, }); @@ -226,23 +227,43 @@ async function audioStreamFromSentences( writableStream.end(); } + let congestionController = new CongestionController(APISettingsStore.getSettings().congestionCtrl ?? CongestionCtrl.Off); + // For each sentence in the stream, add a task to the queue + let sentenceIdx = 0 sentencesStream.on('data', async (data) => { const sentence = data.toString(); - const generatePromise = (async () => { - return await internalGenerateStreamFromString(sentence, options); - })(); - promiseStream.push(generatePromise); + /** + * NOTE: + * + * If the congestion control algorithm is set to {@link CongestionCtrl.Off}, + * then this {@link CongestionController#enqueue} method will invoke the task immediately; + * thereby generating the audio chunk for this sentence immediately. + */ + congestionController.enqueue(() => { + const nextAudioChunk = (async () => { + return await internalGenerateStreamFromString(sentence, options); + })(); + audioChunkStream.push(nextAudioChunk); + }, `createAudioChunk#${sentenceIdx}`) + + sentenceIdx++ }); sentencesStream.on('end', async () => { - promiseStream.push(null); + + /** + * NOTE: if the congestion control algorithm is set to {@link CongestionCtrl.Off}, then this enqueue method will simply invoke the task immediately. + */ + congestionController.enqueue(() => { + audioChunkStream.push(null); + }, "endAudioChunks") }); sentencesStream.on('error', onError); - // Read from the promiseStream and await for each promise in order + // Await each audio chunk in order, and write the raw audio to the output audio stream const writeAudio = new Writable({ objectMode: true, write: async (generatePromise, _, callback) => { @@ -252,12 +273,28 @@ async function audioStreamFromSentences( onError(); return; } + let completion = { + gotHeaders: false, + gotAudio: false, + gotEnd: false + } await new Promise((resolve) => { + resultStream.on('data', (chunk: Buffer) => { + if (completion.gotHeaders && !completion.gotAudio) { + completion.gotAudio = true + congestionController.audioRecvd(); + } else if (!completion.gotHeaders) { + completion.gotHeaders = true + } writableStream.write(chunk); }); resultStream.on('end', () => { + if(!completion.gotEnd) { + completion.gotEnd = true + congestionController.audioDone() + } resolve(); }); @@ -272,9 +309,9 @@ async function audioStreamFromSentences( writeAudio.on('error', onError); - promiseStream.on('error', onError); + audioChunkStream.on('error', onError); - promiseStream.on('end', () => { + audioChunkStream.on('end', () => { setTimeout( () => writeAudio.on('finish', () => { @@ -284,5 +321,113 @@ async function audioStreamFromSentences( ); }); - promiseStream.pipe(writeAudio); + audioChunkStream.pipe(writeAudio); +} + +class Task { + fn: Function + name: string + + constructor(fn: Function, name: string) { + this.fn = fn; + this.name = name + } } + +/** + * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the + * specified {@link CongestionCtrl} algorithm. {@link CongestionController} is essentially a task queue + * that throttles the parallelism of, and delay between, task execution. + * + * The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliances + * from being inundated with a burst text-to-speech requests that it can't satisfy. Prior to the introduction + * of {@link CongestionController} (and more generally {@link CongestionCtrl}), the client would split + * a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) + * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. + * + * The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; + * and therefore, a bad customer experience. {@link CongestionController}, if configured with {@link CongestionCtrl#StaticMar2024}, + * will now delay sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text + * chunk has started streaming. + * + * The current {@link CongestionCtrl} algorithm ({@link CongestionCtrl#StaticMar2024}) is very simple and leaves a lot to + * be desired. We should iterate on these algorithms. The {@link CongestionCtrl} enum was added so that algorithms + * can be added without requiring customers to change their code much. + * + * [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances + * running near 100% utilization. + * + * --mtp@2024/02/28 + * + * This class is largely inert if the specified {@link CongestionCtrl} is {@link CongestionCtrl#Off}. + */ +class CongestionController { + + algo: CongestionCtrl; + taskQ: Task[] = []; + inflight: number = 0; + parallelism: number; + postChunkBackoff: number; + + constructor(algo: CongestionCtrl) { + this.algo = algo; + switch (algo) { + case CongestionCtrl.Off: + this.parallelism = Infinity; + this.postChunkBackoff = 0; + break; + case CongestionCtrl.StaticMar2024: + this.parallelism = 1; + this.postChunkBackoff = 50; + break; + default: + throw new Error(`Unrecognized congestion control algorithm: ${algo}`) + } + } + + enqueue(task: Function, name: string) { + + // if congestion control is turned off - just execute the task immediately + if (this.algo == CongestionCtrl.Off) { + task(); + return; + } + + this.taskQ.push(new Task(task, name)); + this.maybeDoMore(); + } + + maybeDoMore() { + + // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately + if (this.algo == CongestionCtrl.Off) return + + for (; ;) { + if (this.inflight >= this.parallelism) return + if (this.taskQ.length == 0) return + let task = this.taskQ.shift()! + this.inflight++; + task.fn(); + } + } + + audioRecvd() { + + // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately + if (this.algo == CongestionCtrl.Off) return + + this.inflight = Math.max(this.inflight - 1, 0); + setTimeout(() => { + this.maybeDoMore(); + }, this.postChunkBackoff); + } + + audioDone() { + + if (this.algo == CongestionCtrl.Off) return + + this.inflight = Math.max(this.inflight - 1, 0); + this.maybeDoMore(); + } + +} \ No newline at end of file diff --git a/packages/playht/src/grpc-client/client.ts b/packages/playht/src/grpc-client/client.ts index afb89a4..ae0bbcb 100644 --- a/packages/playht/src/grpc-client/client.ts +++ b/packages/playht/src/grpc-client/client.ts @@ -4,6 +4,7 @@ import apiProto from './protos/api'; import { Lease } from './lease'; import { ReadableStream } from './readable-stream'; import { TTSStreamSource } from './tts-stream-source'; +import {CongestionCtrl} from "../index"; export type TTSParams = apiProto.playht.v1.ITtsParams; export const Quality = apiProto.playht.v1.Quality; @@ -35,6 +36,11 @@ export interface ClientOptions { * (configured with "customAddr" above) to the standard PlayHT address. */ fallbackEnabled?: boolean; + + /** + * @see CongestionCtrl + */ + congestionCtrl?: CongestionCtrl; } const USE_INSECURE_CONNECTION = false; @@ -259,7 +265,8 @@ export class Client { rpcClient = isPremium ? this.premiumRpc!.client : this.rpc!.client; fallbackClient = undefined; } - const stream = new ReadableStream(new TTSStreamSource(request, rpcClient, fallbackClient)); + const congestionCtrl = this.options.congestionCtrl ?? CongestionCtrl.Off; + const stream = new ReadableStream(new TTSStreamSource(request, rpcClient, fallbackClient, congestionCtrl)); // fix for TypeScript not DOM types not including Symbol.asyncIterator in ReadableStream return stream as unknown as AsyncIterable & ReadableStream; } diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index 370fd4d..adc4b7d 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -1,16 +1,46 @@ import type * as grpc from '@grpc/grpc-js'; import * as apiProto from './protos/api'; +import {CongestionCtrl} from "../index"; export class TTSStreamSource implements UnderlyingByteSource { private stream?: grpc.ClientReadableStream; readonly type = 'bytes'; private retryable = true; + private retries = 0; + private maxRetries = 0; + private backoff = 0; constructor( private readonly request: apiProto.playht.v1.ITtsRequest, private readonly rpcClient: grpc.Client, private readonly fallbackClient?: grpc.Client, - ) {} + private readonly congestionCtrl?: CongestionCtrl + ) { + if (congestionCtrl != undefined) { + switch (congestionCtrl) { + case CongestionCtrl.Off: + this.maxRetries = 0; + this.backoff = 0; + break; + case CongestionCtrl.StaticMar2024: + /** + * NOTE: + * + * The values below were experimentally chosen. + * + * The experiments were not rigorous and certainly leave a lot to be desired. We should tune them over time. + * We might end up with something dynamic inspired by additive-increase-multiplicative-decrease. + * + * --mtp@2024/02/28 + */ + this.maxRetries = 2; + this.backoff = 50; + break; + default: + throw new Error(`Unrecognized congestion control algorithm: ${congestionCtrl}`); + } + } + } start(controller: ReadableByteStreamController) { this.startAndMaybeFallback(controller, this.rpcClient, this.fallbackClient); @@ -66,15 +96,30 @@ export class TTSStreamSource implements UnderlyingByteSource { } }); this.stream.on('error', (err) => { + // if we get an error while this stream source is still retryable (i.e. we haven't started streaming data back and haven't canceled) - // then we can fallback if there is a fallback rpc client - if (this.retryable && fallbackClient) { - console.warn(`[PlayHT SDK] Falling back...`, fallbackClient.getChannel().getTarget(), err.message); - this.end(); - // start again with the fallback client and the primary client - // we won't specify a second order fallback client - so if this client fails, this stream will fail - this.startAndMaybeFallback(controller, fallbackClient, undefined); - return; + // then we can retry or fall back (if there is a fallback rpc client) + if (this.retryable) { + if (this.retries < this.maxRetries) { + this.end(); + this.retries++; + // NOTE: It's a poor customer experience to show internal details about retries -- so we don't log here. TCP has the same policy. + //console.debug(`[PlayHT SDK] Retrying in ${this.backoff} ms ... (${this.retries} attempts so far)`, err.message); + // retry with the same primary and fallback client + setTimeout(() => { + this.startAndMaybeFallback(controller, client, fallbackClient); + }, this.backoff) + + } else if (fallbackClient) { + // NOTE: We log fallbacks to give customers a signal that they should scale up their on-prem appliance (e.g. by paying for more GPU quota) + console.warn(`[PlayHT SDK] Falling back to ${fallbackClient.getChannel().getTarget()} ...`, err.message); + this.end(); + this.retries = 0; + // start again with the fallback client and the primary client + // we won't specify a second order fallback client - so if this client fails, this stream will fail + this.startAndMaybeFallback(controller, fallbackClient, undefined); + return; + } } // if we reach here - we couldn't fallback and therefore this stream has failed diff --git a/packages/playht/src/index.ts b/packages/playht/src/index.ts index c05cbef..0776eb2 100644 --- a/packages/playht/src/index.ts +++ b/packages/playht/src/index.ts @@ -397,8 +397,36 @@ export type APISettingsInput = { * (configured with "customAddr" above) to the standard PlayHT address. */ fallbackEnabled?: boolean; + + /** + * If specified, the client will use the specified {@link CongestionCtrl} algorithm to optimize + * the rate at which it sends text to PlayHT. + * + * If you're using PlayHT On-Prem, you should set this to {@link CongestionCtrl#StaticMar2024}. + * + * @see CongestionCtrl + */ + congestionCtrl?: CongestionCtrl; }; +/** + * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. + */ +export enum CongestionCtrl { + + /** + * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. + */ + Off, + + /** + * The client will optimize for minimizing the number of physical resources required to handle a single stream. + * + * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. + */ + StaticMar2024 +} + /** * Initializes the library with API credentials. * From 00d31657d89eb4646ad7ff759efcbed83b94a411 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Wed, 28 Feb 2024 21:15:29 -0800 Subject: [PATCH 02/13] Cosmetic --- packages/playht/src/api/apiCommon.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index a25a801..30c405b 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -291,7 +291,7 @@ async function audioStreamFromSentences( }); resultStream.on('end', () => { - if(!completion.gotEnd) { + if (!completion.gotEnd) { completion.gotEnd = true congestionController.audioDone() } From 369c52f4f1d67bd04b5df3543b8f7ee99d43ceb3 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Wed, 28 Feb 2024 21:52:01 -0800 Subject: [PATCH 03/13] Grammar bug --- packages/playht/src/api/apiCommon.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index 30c405b..f27c373 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -340,7 +340,7 @@ class Task { * that throttles the parallelism of, and delay between, task execution. * * The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliances - * from being inundated with a burst text-to-speech requests that it can't satisfy. Prior to the introduction + * from being inundated with a burst of text-to-speech requests that it can't satisfy. Prior to the introduction * of {@link CongestionController} (and more generally {@link CongestionCtrl}), the client would split * a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. From d43e098080c8608f2b31433f7eb04a85dc35d343 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Wed, 28 Feb 2024 23:11:35 -0800 Subject: [PATCH 04/13] Comment tweak --- packages/playht/src/grpc-client/tts-stream-source.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index adc4b7d..bcf7c3e 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -103,7 +103,7 @@ export class TTSStreamSource implements UnderlyingByteSource { if (this.retries < this.maxRetries) { this.end(); this.retries++; - // NOTE: It's a poor customer experience to show internal details about retries -- so we don't log here. TCP has the same policy. + // NOTE: It's a poor customer experience to show internal details about retries -- so we don't log here. //console.debug(`[PlayHT SDK] Retrying in ${this.backoff} ms ... (${this.retries} attempts so far)`, err.message); // retry with the same primary and fallback client setTimeout(() => { From ad1423ff719d0e806d18f40825770f65f7fccf90 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 29 Feb 2024 08:26:34 -0800 Subject: [PATCH 05/13] Don't retry with the fallback client --- packages/playht/src/grpc-client/tts-stream-source.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index bcf7c3e..00fb228 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -114,9 +114,9 @@ export class TTSStreamSource implements UnderlyingByteSource { // NOTE: We log fallbacks to give customers a signal that they should scale up their on-prem appliance (e.g. by paying for more GPU quota) console.warn(`[PlayHT SDK] Falling back to ${fallbackClient.getChannel().getTarget()} ...`, err.message); this.end(); - this.retries = 0; // start again with the fallback client and the primary client // we won't specify a second order fallback client - so if this client fails, this stream will fail + // we also won't reset the number of retries - so we'll try at most once with the fallback client this.startAndMaybeFallback(controller, fallbackClient, undefined); return; } From e967f1955c30f186a059da2ab1432b7d211c9cff Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:50:45 -0800 Subject: [PATCH 06/13] Address cr comments. --- packages/playht/src/api/apiCommon.ts | 167 ++++-------------- packages/playht/src/grpc-client/client.ts | 2 +- .../src/grpc-client/tts-stream-source.ts | 8 +- packages/playht/src/index.ts | 3 +- 4 files changed, 39 insertions(+), 141 deletions(-) diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index f27c373..ff2422a 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -1,15 +1,16 @@ import type { - SpeechOptions, - SpeechStreamOptions, - SpeechOutput, - OutputQuality, Emotion, - VoiceEngine, + OutputFormat, + OutputQuality, PlayHT10OutputStreamFormat, PlayHT20OutputStreamFormat, - OutputFormat, + SpeechOptions, + SpeechOutput, + SpeechStreamOptions, + VoiceEngine, } from '..'; import { PassThrough, Readable, Writable } from 'node:stream'; +import { CongestionCtrl, PlayHT20EngineStreamOptions } from '..'; import { APISettingsStore } from './APISettingsStore'; import { generateV1Speech } from './generateV1Speech'; import { generateV1Stream } from './generateV1Stream'; @@ -17,7 +18,7 @@ import { generateV2Speech } from './generateV2Speech'; import { generateV2Stream } from './generateV2Stream'; import { textStreamToSentences } from './textStreamToSentences'; import { generateGRpcStream } from './generateGRpcStream'; -import {CongestionCtrl} from ".."; +import { CongestionController } from './CongestionController'; export type V1ApiOptions = { narrationStyle?: string; @@ -227,10 +228,12 @@ async function audioStreamFromSentences( writableStream.end(); } - let congestionController = new CongestionController(APISettingsStore.getSettings().congestionCtrl ?? CongestionCtrl.Off); + const congestionController = new CongestionController( + APISettingsStore.getSettings().congestionCtrl ?? CongestionCtrl.Off, + ); // For each sentence in the stream, add a task to the queue - let sentenceIdx = 0 + let sentenceIdx = 0; sentencesStream.on('data', async (data) => { const sentence = data.toString(); @@ -240,25 +243,27 @@ async function audioStreamFromSentences( * If the congestion control algorithm is set to {@link CongestionCtrl.Off}, * then this {@link CongestionController#enqueue} method will invoke the task immediately; * thereby generating the audio chunk for this sentence immediately. + * + * @see CongestionController + * @see CongestionCtrl */ congestionController.enqueue(() => { const nextAudioChunk = (async () => { return await internalGenerateStreamFromString(sentence, options); })(); audioChunkStream.push(nextAudioChunk); - }, `createAudioChunk#${sentenceIdx}`) + }, `createAudioChunk#${sentenceIdx}`); - sentenceIdx++ + sentenceIdx++; }); sentencesStream.on('end', async () => { - /** * NOTE: if the congestion control algorithm is set to {@link CongestionCtrl.Off}, then this enqueue method will simply invoke the task immediately. */ congestionController.enqueue(() => { audioChunkStream.push(null); - }, "endAudioChunks") + }, 'endAudioChunks'); }); sentencesStream.on('error', onError); @@ -273,28 +278,32 @@ async function audioStreamFromSentences( onError(); return; } - let completion = { - gotHeaders: false, + const completion = { + headersRemaining: 0, gotAudio: false, - gotEnd: false + }; + switch ((options).outputFormat) { + case 'wav': + completion.headersRemaining = 1; + break; + case 'mp3': + completion.headersRemaining = 1; + break; + default: + break; } await new Promise((resolve) => { - resultStream.on('data', (chunk: Buffer) => { - if (completion.gotHeaders && !completion.gotAudio) { - completion.gotAudio = true + if (completion.headersRemaining > 0) { + completion.headersRemaining -= 1; + } else if (!completion.gotAudio) { + completion.gotAudio = true; congestionController.audioRecvd(); - } else if (!completion.gotHeaders) { - completion.gotHeaders = true } writableStream.write(chunk); }); resultStream.on('end', () => { - if (!completion.gotEnd) { - completion.gotEnd = true - congestionController.audioDone() - } resolve(); }); @@ -323,111 +332,3 @@ async function audioStreamFromSentences( audioChunkStream.pipe(writeAudio); } - -class Task { - fn: Function - name: string - - constructor(fn: Function, name: string) { - this.fn = fn; - this.name = name - } -} - -/** - * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the - * specified {@link CongestionCtrl} algorithm. {@link CongestionController} is essentially a task queue - * that throttles the parallelism of, and delay between, task execution. - * - * The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliances - * from being inundated with a burst of text-to-speech requests that it can't satisfy. Prior to the introduction - * of {@link CongestionController} (and more generally {@link CongestionCtrl}), the client would split - * a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) - * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. - * - * The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; - * and therefore, a bad customer experience. {@link CongestionController}, if configured with {@link CongestionCtrl#StaticMar2024}, - * will now delay sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text - * chunk has started streaming. - * - * The current {@link CongestionCtrl} algorithm ({@link CongestionCtrl#StaticMar2024}) is very simple and leaves a lot to - * be desired. We should iterate on these algorithms. The {@link CongestionCtrl} enum was added so that algorithms - * can be added without requiring customers to change their code much. - * - * [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances - * running near 100% utilization. - * - * --mtp@2024/02/28 - * - * This class is largely inert if the specified {@link CongestionCtrl} is {@link CongestionCtrl#Off}. - */ -class CongestionController { - - algo: CongestionCtrl; - taskQ: Task[] = []; - inflight: number = 0; - parallelism: number; - postChunkBackoff: number; - - constructor(algo: CongestionCtrl) { - this.algo = algo; - switch (algo) { - case CongestionCtrl.Off: - this.parallelism = Infinity; - this.postChunkBackoff = 0; - break; - case CongestionCtrl.StaticMar2024: - this.parallelism = 1; - this.postChunkBackoff = 50; - break; - default: - throw new Error(`Unrecognized congestion control algorithm: ${algo}`) - } - } - - enqueue(task: Function, name: string) { - - // if congestion control is turned off - just execute the task immediately - if (this.algo == CongestionCtrl.Off) { - task(); - return; - } - - this.taskQ.push(new Task(task, name)); - this.maybeDoMore(); - } - - maybeDoMore() { - - // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately - if (this.algo == CongestionCtrl.Off) return - - for (; ;) { - if (this.inflight >= this.parallelism) return - if (this.taskQ.length == 0) return - let task = this.taskQ.shift()! - this.inflight++; - task.fn(); - } - } - - audioRecvd() { - - // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately - if (this.algo == CongestionCtrl.Off) return - - this.inflight = Math.max(this.inflight - 1, 0); - setTimeout(() => { - this.maybeDoMore(); - }, this.postChunkBackoff); - } - - audioDone() { - - if (this.algo == CongestionCtrl.Off) return - - this.inflight = Math.max(this.inflight - 1, 0); - this.maybeDoMore(); - } - -} \ No newline at end of file diff --git a/packages/playht/src/grpc-client/client.ts b/packages/playht/src/grpc-client/client.ts index ae0bbcb..05129d2 100644 --- a/packages/playht/src/grpc-client/client.ts +++ b/packages/playht/src/grpc-client/client.ts @@ -1,10 +1,10 @@ import { credentials, Client as GrpcClient } from '@grpc/grpc-js'; import fetch from 'cross-fetch'; +import { CongestionCtrl } from '../index'; import apiProto from './protos/api'; import { Lease } from './lease'; import { ReadableStream } from './readable-stream'; import { TTSStreamSource } from './tts-stream-source'; -import {CongestionCtrl} from "../index"; export type TTSParams = apiProto.playht.v1.ITtsParams; export const Quality = apiProto.playht.v1.Quality; diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index 00fb228..b86267d 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -1,6 +1,6 @@ import type * as grpc from '@grpc/grpc-js'; +import { CongestionCtrl } from '../index'; import * as apiProto from './protos/api'; -import {CongestionCtrl} from "../index"; export class TTSStreamSource implements UnderlyingByteSource { private stream?: grpc.ClientReadableStream; @@ -14,7 +14,7 @@ export class TTSStreamSource implements UnderlyingByteSource { private readonly request: apiProto.playht.v1.ITtsRequest, private readonly rpcClient: grpc.Client, private readonly fallbackClient?: grpc.Client, - private readonly congestionCtrl?: CongestionCtrl + private readonly congestionCtrl?: CongestionCtrl, ) { if (congestionCtrl != undefined) { switch (congestionCtrl) { @@ -96,7 +96,6 @@ export class TTSStreamSource implements UnderlyingByteSource { } }); this.stream.on('error', (err) => { - // if we get an error while this stream source is still retryable (i.e. we haven't started streaming data back and haven't canceled) // then we can retry or fall back (if there is a fallback rpc client) if (this.retryable) { @@ -108,8 +107,7 @@ export class TTSStreamSource implements UnderlyingByteSource { // retry with the same primary and fallback client setTimeout(() => { this.startAndMaybeFallback(controller, client, fallbackClient); - }, this.backoff) - + }, this.backoff); } else if (fallbackClient) { // NOTE: We log fallbacks to give customers a signal that they should scale up their on-prem appliance (e.g. by paying for more GPU quota) console.warn(`[PlayHT SDK] Falling back to ${fallbackClient.getChannel().getTarget()} ...`, err.message); diff --git a/packages/playht/src/index.ts b/packages/playht/src/index.ts index 0776eb2..2f9cc72 100644 --- a/packages/playht/src/index.ts +++ b/packages/playht/src/index.ts @@ -413,7 +413,6 @@ export type APISettingsInput = { * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. */ export enum CongestionCtrl { - /** * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. */ @@ -424,7 +423,7 @@ export enum CongestionCtrl { * * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. */ - StaticMar2024 + StaticMar2024, } /** From 89bea01cdfc4d31951e36bba6067ba67431da086 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:52:29 -0800 Subject: [PATCH 07/13] Add missing file --- .../playht/src/api/CongestionController.ts | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 packages/playht/src/api/CongestionController.ts diff --git a/packages/playht/src/api/CongestionController.ts b/packages/playht/src/api/CongestionController.ts new file mode 100644 index 0000000..cd0a5a4 --- /dev/null +++ b/packages/playht/src/api/CongestionController.ts @@ -0,0 +1,101 @@ +import { CongestionCtrl } from '../index'; + +/** + * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the + * specified {@link CongestionCtrl} algorithm. {@link CongestionController} is essentially a task queue + * that throttles the parallelism of, and delay between, task execution. + * + * The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliances + * from being inundated with a burst of text-to-speech requests that it can't satisfy. Prior to the introduction + * of {@link CongestionController} (and more generally {@link CongestionCtrl}), the client would split + * a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) + * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. + * + * The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; + * and therefore, a bad customer experience. {@link CongestionController}, if configured with {@link CongestionCtrl#StaticMar2024}, + * will now delay sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text + * chunk has started streaming. + * + * The current {@link CongestionCtrl} algorithm ({@link CongestionCtrl#StaticMar2024}) is very simple and leaves a lot to + * be desired. We should iterate on these algorithms. The {@link CongestionCtrl} enum was added so that algorithms + * can be added without requiring customers to change their code much. + * + * [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances + * running near 100% utilization. + * + * --mtp@2024/02/28 + * + * This class is largely inert if the specified {@link CongestionCtrl} is {@link CongestionCtrl#Off}. + */ +export class CongestionController { + algo: CongestionCtrl; + taskQ: Array = []; + inflight = 0; + parallelism: number; + postChunkBackoff: number; + + constructor(algo: CongestionCtrl) { + this.algo = algo; + switch (algo) { + case CongestionCtrl.Off: + this.parallelism = Infinity; + this.postChunkBackoff = 0; + break; + case CongestionCtrl.StaticMar2024: + this.parallelism = 1; + this.postChunkBackoff = 50; + break; + default: + throw new Error(`Unrecognized congestion control algorithm: ${algo}`); + } + } + + enqueue(task: () => void, name: string) { + // if congestion control is turned off - just execute the task immediately + if (this.algo == CongestionCtrl.Off) { + task(); + return; + } + + this.taskQ.push(new Task(task, name)); + this.maybeDoMore(); + } + + private maybeDoMore() { + // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately + if (this.algo == CongestionCtrl.Off) return; + + while (true) { + if (this.inflight >= this.parallelism) return; + if (this.taskQ.length == 0) return; + const task = this.taskQ.shift()!; + this.inflight++; + //console.debug(`[PlayHT SDK] Started congestion control task: ${task.name}. inflight=${this.inflight}`); + task.fn(); + } + } + + audioRecvd() { + // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately + if (this.algo == CongestionCtrl.Off) return; + + this.inflight = Math.max(this.inflight - 1, 0); + //console.debug('[PlayHT SDK] Congestion control received audio'); + setTimeout(() => { + this.maybeDoMore(); + }, this.postChunkBackoff); + } +} + +/** + * NOTE: + * + * {@link #name} is currently unused, but exists so that we can log task names during development. + * Without {@link #name}, it's hard to understand which tasks were executed and in which order. + */ +class Task { + constructor( + public fn: () => void, + public name: string, + ) {} +} From 0870aed833832339a77585e743dcb1b899baaaf1 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 09:58:48 -0800 Subject: [PATCH 08/13] Fix lint warning --- packages/playht/src/api/CongestionController.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/playht/src/api/CongestionController.ts b/packages/playht/src/api/CongestionController.ts index cd0a5a4..3294b06 100644 --- a/packages/playht/src/api/CongestionController.ts +++ b/packages/playht/src/api/CongestionController.ts @@ -65,7 +65,7 @@ export class CongestionController { // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately if (this.algo == CongestionCtrl.Off) return; - while (true) { + for (;;) { if (this.inflight >= this.parallelism) return; if (this.taskQ.length == 0) return; const task = this.taskQ.shift()!; From 0638eecc1b6b69c8db3c61d34ab3c84a12d7286c Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 10:09:30 -0800 Subject: [PATCH 09/13] Use while instead of for loop --- packages/playht/src/api/CongestionController.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/playht/src/api/CongestionController.ts b/packages/playht/src/api/CongestionController.ts index 3294b06..fd32ae6 100644 --- a/packages/playht/src/api/CongestionController.ts +++ b/packages/playht/src/api/CongestionController.ts @@ -65,9 +65,7 @@ export class CongestionController { // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately if (this.algo == CongestionCtrl.Off) return; - for (;;) { - if (this.inflight >= this.parallelism) return; - if (this.taskQ.length == 0) return; + while (this.inflight < this.parallelism && this.taskQ.length > 0) { const task = this.taskQ.shift()!; this.inflight++; //console.debug(`[PlayHT SDK] Started congestion control task: ${task.name}. inflight=${this.inflight}`); From 951dd3da6429a1b42a47c49b0f1a1d2376518b87 Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 11:24:15 -0800 Subject: [PATCH 10/13] Fix dependency cycles. --- packages/playht/src/api/apiCommon.ts | 6 +++--- ...ngestionController.ts => congestionCtrl.ts} | 17 ++++++++++++++++- packages/playht/src/grpc-client/client.ts | 5 ++--- packages/playht/src/index.ts | 18 +----------------- 4 files changed, 22 insertions(+), 24 deletions(-) rename packages/playht/src/api/{CongestionController.ts => congestionCtrl.ts} (88%) diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index ff2422a..bf0e23e 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -10,7 +10,6 @@ import type { VoiceEngine, } from '..'; import { PassThrough, Readable, Writable } from 'node:stream'; -import { CongestionCtrl, PlayHT20EngineStreamOptions } from '..'; import { APISettingsStore } from './APISettingsStore'; import { generateV1Speech } from './generateV1Speech'; import { generateV1Stream } from './generateV1Stream'; @@ -18,7 +17,7 @@ import { generateV2Speech } from './generateV2Speech'; import { generateV2Stream } from './generateV2Stream'; import { textStreamToSentences } from './textStreamToSentences'; import { generateGRpcStream } from './generateGRpcStream'; -import { CongestionController } from './CongestionController'; +import { CongestionController, CongestionCtrl } from './congestionCtrl'; export type V1ApiOptions = { narrationStyle?: string; @@ -282,7 +281,8 @@ async function audioStreamFromSentences( headersRemaining: 0, gotAudio: false, }; - switch ((options).outputFormat) { + // NOTE: The cast below is to avoid a cyclic dependency warning from "yarn verify" + switch ((<{ outputFormat: string }>options).outputFormat) { case 'wav': completion.headersRemaining = 1; break; diff --git a/packages/playht/src/api/CongestionController.ts b/packages/playht/src/api/congestionCtrl.ts similarity index 88% rename from packages/playht/src/api/CongestionController.ts rename to packages/playht/src/api/congestionCtrl.ts index fd32ae6..912bed4 100644 --- a/packages/playht/src/api/CongestionController.ts +++ b/packages/playht/src/api/congestionCtrl.ts @@ -1,4 +1,19 @@ -import { CongestionCtrl } from '../index'; +/** + * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. + */ +export enum CongestionCtrl { + /** + * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. + */ + Off, + + /** + * The client will optimize for minimizing the number of physical resources required to handle a single stream. + * + * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. + */ + StaticMar2024, +} /** * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the diff --git a/packages/playht/src/grpc-client/client.ts b/packages/playht/src/grpc-client/client.ts index 05129d2..0932a13 100644 --- a/packages/playht/src/grpc-client/client.ts +++ b/packages/playht/src/grpc-client/client.ts @@ -1,10 +1,9 @@ -import { credentials, Client as GrpcClient } from '@grpc/grpc-js'; +import { Client as GrpcClient, credentials } from '@grpc/grpc-js'; import fetch from 'cross-fetch'; -import { CongestionCtrl } from '../index'; import apiProto from './protos/api'; import { Lease } from './lease'; import { ReadableStream } from './readable-stream'; -import { TTSStreamSource } from './tts-stream-source'; +import { CongestionCtrl, TTSStreamSource } from './tts-stream-source'; export type TTSParams = apiProto.playht.v1.ITtsParams; export const Quality = apiProto.playht.v1.Quality; diff --git a/packages/playht/src/index.ts b/packages/playht/src/index.ts index 2f9cc72..8d3f59b 100644 --- a/packages/playht/src/index.ts +++ b/packages/playht/src/index.ts @@ -2,6 +2,7 @@ import { APISettingsStore } from './api/APISettingsStore'; import { commonGenerateSpeech, commonGenerateStream } from './api/apiCommon'; import { commonGetAllVoices } from './api/commonGetAllVoices'; import { commonInstantClone, internalDeleteClone } from './api/instantCloneInternal'; +import { CongestionCtrl } from './api/congestionCtrl'; /** * Type representing the various voice engines that can be used for speech synthesis. @@ -409,23 +410,6 @@ export type APISettingsInput = { congestionCtrl?: CongestionCtrl; }; -/** - * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. - */ -export enum CongestionCtrl { - /** - * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. - */ - Off, - - /** - * The client will optimize for minimizing the number of physical resources required to handle a single stream. - * - * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. - */ - StaticMar2024, -} - /** * Initializes the library with API credentials. * From ddfe54a38c9a0ffc5e7115e603ed91a6d71a378f Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Thu, 7 Mar 2024 11:24:19 -0800 Subject: [PATCH 11/13] Fix dependency cycles. --- .../src/grpc-client/tts-stream-source.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index b86267d..9bcb787 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -1,5 +1,4 @@ import type * as grpc from '@grpc/grpc-js'; -import { CongestionCtrl } from '../index'; import * as apiProto from './protos/api'; export class TTSStreamSource implements UnderlyingByteSource { @@ -148,3 +147,20 @@ export class TTSStreamSource implements UnderlyingByteSource { return false; } } + +/** + * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. + */ +export enum CongestionCtrl { + /** + * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. + */ + Off, + + /** + * The client will optimize for minimizing the number of physical resources required to handle a single stream. + * + * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. + */ + StaticMar2024, +} From f537ef6f20987f9fd06fa5d242f8d1e6adf873df Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Fri, 8 Mar 2024 12:53:18 -0800 Subject: [PATCH 12/13] Make CongestionCtrl enum values explicit --- packages/playht/src/api/congestionCtrl.ts | 4 ++-- packages/playht/src/grpc-client/tts-stream-source.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/playht/src/api/congestionCtrl.ts b/packages/playht/src/api/congestionCtrl.ts index 912bed4..7ad5409 100644 --- a/packages/playht/src/api/congestionCtrl.ts +++ b/packages/playht/src/api/congestionCtrl.ts @@ -5,14 +5,14 @@ export enum CongestionCtrl { /** * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. */ - Off, + Off = 0, /** * The client will optimize for minimizing the number of physical resources required to handle a single stream. * * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. */ - StaticMar2024, + StaticMar2024 = 1, } /** diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index 9bcb787..54e72e1 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -155,12 +155,12 @@ export enum CongestionCtrl { /** * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. */ - Off, + Off = 0, /** * The client will optimize for minimizing the number of physical resources required to handle a single stream. * * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. */ - StaticMar2024, + StaticMar2024 = 1, } From ae5b8431efca0039a92c58eb9cf37d09b873361b Mon Sep 17 00:00:00 2001 From: mtenpow <3721+mtenpow@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:42:30 -0700 Subject: [PATCH 13/13] Use strings instead of enums. --- packages/playht/src/api/apiCommon.ts | 10 ++++---- packages/playht/src/api/congestionCtrl.ts | 23 +++++++++---------- packages/playht/src/grpc-client/client.ts | 5 ++-- .../playht/src/grpc-client/congestion-ctrl.ts | 15 ++++++++++++ .../src/grpc-client/tts-stream-source.ts | 22 +++--------------- packages/playht/src/index.ts | 2 +- 6 files changed, 37 insertions(+), 40 deletions(-) create mode 100644 packages/playht/src/grpc-client/congestion-ctrl.ts diff --git a/packages/playht/src/api/apiCommon.ts b/packages/playht/src/api/apiCommon.ts index bf0e23e..f7d2617 100644 --- a/packages/playht/src/api/apiCommon.ts +++ b/packages/playht/src/api/apiCommon.ts @@ -17,7 +17,7 @@ import { generateV2Speech } from './generateV2Speech'; import { generateV2Stream } from './generateV2Stream'; import { textStreamToSentences } from './textStreamToSentences'; import { generateGRpcStream } from './generateGRpcStream'; -import { CongestionController, CongestionCtrl } from './congestionCtrl'; +import { CongestionController } from './congestionCtrl'; export type V1ApiOptions = { narrationStyle?: string; @@ -227,9 +227,7 @@ async function audioStreamFromSentences( writableStream.end(); } - const congestionController = new CongestionController( - APISettingsStore.getSettings().congestionCtrl ?? CongestionCtrl.Off, - ); + const congestionController = new CongestionController(APISettingsStore.getSettings().congestionCtrl ?? 'Off'); // For each sentence in the stream, add a task to the queue let sentenceIdx = 0; @@ -239,7 +237,7 @@ async function audioStreamFromSentences( /** * NOTE: * - * If the congestion control algorithm is set to {@link CongestionCtrl.Off}, + * If the congestion control algorithm is set to "Off", * then this {@link CongestionController#enqueue} method will invoke the task immediately; * thereby generating the audio chunk for this sentence immediately. * @@ -258,7 +256,7 @@ async function audioStreamFromSentences( sentencesStream.on('end', async () => { /** - * NOTE: if the congestion control algorithm is set to {@link CongestionCtrl.Off}, then this enqueue method will simply invoke the task immediately. + * NOTE: if the congestion control algorithm is set to "Off", then this enqueue method will simply invoke the task immediately. */ congestionController.enqueue(() => { audioChunkStream.push(null); diff --git a/packages/playht/src/api/congestionCtrl.ts b/packages/playht/src/api/congestionCtrl.ts index 7ad5409..c553e41 100644 --- a/packages/playht/src/api/congestionCtrl.ts +++ b/packages/playht/src/api/congestionCtrl.ts @@ -1,19 +1,18 @@ /** * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. */ -export enum CongestionCtrl { +export type CongestionCtrl = /** * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. */ - Off = 0, + | 'Off' /** * The client will optimize for minimizing the number of physical resources required to handle a single stream. * * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. */ - StaticMar2024 = 1, -} + | 'StaticMar2024'; /** * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the @@ -27,11 +26,11 @@ export enum CongestionCtrl { * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. * * The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; - * and therefore, a bad customer experience. {@link CongestionController}, if configured with {@link CongestionCtrl#StaticMar2024}, + * and therefore, a bad customer experience. {@link CongestionController}, if configured with "StaticMar2024", * will now delay sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text * chunk has started streaming. * - * The current {@link CongestionCtrl} algorithm ({@link CongestionCtrl#StaticMar2024}) is very simple and leaves a lot to + * The current {@link CongestionCtrl} algorithm ("StaticMar2024") is very simple and leaves a lot to * be desired. We should iterate on these algorithms. The {@link CongestionCtrl} enum was added so that algorithms * can be added without requiring customers to change their code much. * @@ -40,7 +39,7 @@ export enum CongestionCtrl { * * --mtp@2024/02/28 * - * This class is largely inert if the specified {@link CongestionCtrl} is {@link CongestionCtrl#Off}. + * This class is largely inert if the specified {@link CongestionCtrl} is "Off". */ export class CongestionController { algo: CongestionCtrl; @@ -52,11 +51,11 @@ export class CongestionController { constructor(algo: CongestionCtrl) { this.algo = algo; switch (algo) { - case CongestionCtrl.Off: + case 'Off': this.parallelism = Infinity; this.postChunkBackoff = 0; break; - case CongestionCtrl.StaticMar2024: + case 'StaticMar2024': this.parallelism = 1; this.postChunkBackoff = 50; break; @@ -67,7 +66,7 @@ export class CongestionController { enqueue(task: () => void, name: string) { // if congestion control is turned off - just execute the task immediately - if (this.algo == CongestionCtrl.Off) { + if (this.algo == 'Off') { task(); return; } @@ -78,7 +77,7 @@ export class CongestionController { private maybeDoMore() { // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately - if (this.algo == CongestionCtrl.Off) return; + if (this.algo == 'Off') return; while (this.inflight < this.parallelism && this.taskQ.length > 0) { const task = this.taskQ.shift()!; @@ -90,7 +89,7 @@ export class CongestionController { audioRecvd() { // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately - if (this.algo == CongestionCtrl.Off) return; + if (this.algo == 'Off') return; this.inflight = Math.max(this.inflight - 1, 0); //console.debug('[PlayHT SDK] Congestion control received audio'); diff --git a/packages/playht/src/grpc-client/client.ts b/packages/playht/src/grpc-client/client.ts index 0932a13..584323a 100644 --- a/packages/playht/src/grpc-client/client.ts +++ b/packages/playht/src/grpc-client/client.ts @@ -3,7 +3,8 @@ import fetch from 'cross-fetch'; import apiProto from './protos/api'; import { Lease } from './lease'; import { ReadableStream } from './readable-stream'; -import { CongestionCtrl, TTSStreamSource } from './tts-stream-source'; +import { TTSStreamSource } from './tts-stream-source'; +import { CongestionCtrl } from './congestion-ctrl'; export type TTSParams = apiProto.playht.v1.ITtsParams; export const Quality = apiProto.playht.v1.Quality; @@ -264,7 +265,7 @@ export class Client { rpcClient = isPremium ? this.premiumRpc!.client : this.rpc!.client; fallbackClient = undefined; } - const congestionCtrl = this.options.congestionCtrl ?? CongestionCtrl.Off; + const congestionCtrl = this.options.congestionCtrl ?? 'Off'; const stream = new ReadableStream(new TTSStreamSource(request, rpcClient, fallbackClient, congestionCtrl)); // fix for TypeScript not DOM types not including Symbol.asyncIterator in ReadableStream return stream as unknown as AsyncIterable & ReadableStream; diff --git a/packages/playht/src/grpc-client/congestion-ctrl.ts b/packages/playht/src/grpc-client/congestion-ctrl.ts new file mode 100644 index 0000000..4f84684 --- /dev/null +++ b/packages/playht/src/grpc-client/congestion-ctrl.ts @@ -0,0 +1,15 @@ +/** + * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. + */ +export type CongestionCtrl = + /** + * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. + */ + | 'Off' + + /** + * The client will optimize for minimizing the number of physical resources required to handle a single stream. + * + * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. + */ + | 'StaticMar2024'; diff --git a/packages/playht/src/grpc-client/tts-stream-source.ts b/packages/playht/src/grpc-client/tts-stream-source.ts index 54e72e1..b69c3bd 100644 --- a/packages/playht/src/grpc-client/tts-stream-source.ts +++ b/packages/playht/src/grpc-client/tts-stream-source.ts @@ -1,5 +1,6 @@ import type * as grpc from '@grpc/grpc-js'; import * as apiProto from './protos/api'; +import { CongestionCtrl } from './congestion-ctrl'; export class TTSStreamSource implements UnderlyingByteSource { private stream?: grpc.ClientReadableStream; @@ -17,11 +18,11 @@ export class TTSStreamSource implements UnderlyingByteSource { ) { if (congestionCtrl != undefined) { switch (congestionCtrl) { - case CongestionCtrl.Off: + case 'Off': this.maxRetries = 0; this.backoff = 0; break; - case CongestionCtrl.StaticMar2024: + case 'StaticMar2024': /** * NOTE: * @@ -147,20 +148,3 @@ export class TTSStreamSource implements UnderlyingByteSource { return false; } } - -/** - * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. - */ -export enum CongestionCtrl { - /** - * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. - */ - Off = 0, - - /** - * The client will optimize for minimizing the number of physical resources required to handle a single stream. - * - * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. - */ - StaticMar2024 = 1, -} diff --git a/packages/playht/src/index.ts b/packages/playht/src/index.ts index 8d3f59b..98ee22a 100644 --- a/packages/playht/src/index.ts +++ b/packages/playht/src/index.ts @@ -403,7 +403,7 @@ export type APISettingsInput = { * If specified, the client will use the specified {@link CongestionCtrl} algorithm to optimize * the rate at which it sends text to PlayHT. * - * If you're using PlayHT On-Prem, you should set this to {@link CongestionCtrl#StaticMar2024}. + * If you're using PlayHT On-Prem, you should set this to "StaticMar2024". * * @see CongestionCtrl */