-
Notifications
You must be signed in to change notification settings - Fork 16
Introduce configurable congestion control. #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3718449
00d3165
369c52f
d43e098
ad1423f
e967f19
89bea01
0870aed
0638eec
951dd3d
ddfe54a
f537ef6
ae5b843
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,13 +1,13 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||
| import type { | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechStreamOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechOutput, | ||||||||||||||||||||||||||||||||||||||||||||||||
| OutputQuality, | ||||||||||||||||||||||||||||||||||||||||||||||||
| Emotion, | ||||||||||||||||||||||||||||||||||||||||||||||||
| VoiceEngine, | ||||||||||||||||||||||||||||||||||||||||||||||||
| OutputFormat, | ||||||||||||||||||||||||||||||||||||||||||||||||
| OutputQuality, | ||||||||||||||||||||||||||||||||||||||||||||||||
| PlayHT10OutputStreamFormat, | ||||||||||||||||||||||||||||||||||||||||||||||||
| PlayHT20OutputStreamFormat, | ||||||||||||||||||||||||||||||||||||||||||||||||
| OutputFormat, | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechOutput, | ||||||||||||||||||||||||||||||||||||||||||||||||
| SpeechStreamOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||
| VoiceEngine, | ||||||||||||||||||||||||||||||||||||||||||||||||
| } from '..'; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import { PassThrough, Readable, Writable } from 'node:stream'; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import { APISettingsStore } from './APISettingsStore'; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -17,6 +17,7 @@ import { generateV2Speech } from './generateV2Speech'; | |||||||||||||||||||||||||||||||||||||||||||||||
| import { generateV2Stream } from './generateV2Stream'; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import { textStreamToSentences } from './textStreamToSentences'; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import { generateGRpcStream } from './generateGRpcStream'; | ||||||||||||||||||||||||||||||||||||||||||||||||
| import { CongestionController } from './congestionCtrl'; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| export type V1ApiOptions = { | ||||||||||||||||||||||||||||||||||||||||||||||||
| narrationStyle?: string; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -198,8 +199,8 @@ async function audioStreamFromSentences( | |||||||||||||||||||||||||||||||||||||||||||||||
| writableStream: NodeJS.WritableStream, | ||||||||||||||||||||||||||||||||||||||||||||||||
| options?: SpeechStreamOptions, | ||||||||||||||||||||||||||||||||||||||||||||||||
| ) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| // Create a stream for promises | ||||||||||||||||||||||||||||||||||||||||||||||||
| const promiseStream = new Readable({ | ||||||||||||||||||||||||||||||||||||||||||||||||
| // Create a stream of audio chunk promises -- each corresponding to a sentence | ||||||||||||||||||||||||||||||||||||||||||||||||
| const audioChunkStream = new Readable({ | ||||||||||||||||||||||||||||||||||||||||||||||||
| objectMode: true, | ||||||||||||||||||||||||||||||||||||||||||||||||
| read() {}, | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -226,23 +227,45 @@ async function audioStreamFromSentences( | |||||||||||||||||||||||||||||||||||||||||||||||
| writableStream.end(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| const congestionController = new CongestionController(APISettingsStore.getSettings().congestionCtrl ?? 'Off'); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // For each sentence in the stream, add a task to the queue | ||||||||||||||||||||||||||||||||||||||||||||||||
| let sentenceIdx = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||
| sentencesStream.on('data', async (data) => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| const sentence = data.toString(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| const generatePromise = (async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| return await internalGenerateStreamFromString(sentence, options); | ||||||||||||||||||||||||||||||||||||||||||||||||
| })(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| promiseStream.push(generatePromise); | ||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||
| * NOTE: | ||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||
| * If the congestion control algorithm is set to "Off", | ||||||||||||||||||||||||||||||||||||||||||||||||
| * then this {@link CongestionController#enqueue} method will invoke the task immediately; | ||||||||||||||||||||||||||||||||||||||||||||||||
| * thereby generating the audio chunk for this sentence immediately. | ||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||
| * @see CongestionController | ||||||||||||||||||||||||||||||||||||||||||||||||
| * @see CongestionCtrl | ||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||
| congestionController.enqueue(() => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| const nextAudioChunk = (async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| return await internalGenerateStreamFromString(sentence, options); | ||||||||||||||||||||||||||||||||||||||||||||||||
| })(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+248
to
+250
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Edit:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||
| audioChunkStream.push(nextAudioChunk); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }, `createAudioChunk#${sentenceIdx}`); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| sentenceIdx++; | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| sentencesStream.on('end', async () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| promiseStream.push(null); | ||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||
| * NOTE: if the congestion control algorithm is set to "Off", then this enqueue method will simply invoke the task immediately. | ||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||
| congestionController.enqueue(() => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| audioChunkStream.push(null); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }, 'endAudioChunks'); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| sentencesStream.on('error', onError); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| // Read from the promiseStream and await for each promise in order | ||||||||||||||||||||||||||||||||||||||||||||||||
| // Await each audio chunk in order, and write the raw audio to the output audio stream | ||||||||||||||||||||||||||||||||||||||||||||||||
| const writeAudio = new Writable({ | ||||||||||||||||||||||||||||||||||||||||||||||||
| objectMode: true, | ||||||||||||||||||||||||||||||||||||||||||||||||
| write: async (generatePromise, _, callback) => { | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -252,8 +275,29 @@ async function audioStreamFromSentences( | |||||||||||||||||||||||||||||||||||||||||||||||
| onError(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| const completion = { | ||||||||||||||||||||||||||||||||||||||||||||||||
| headersRemaining: 0, | ||||||||||||||||||||||||||||||||||||||||||||||||
| gotAudio: false, | ||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||
| // NOTE: The cast below is to avoid a cyclic dependency warning from "yarn verify" | ||||||||||||||||||||||||||||||||||||||||||||||||
| switch ((<{ outputFormat: string }>options).outputFormat) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| case 'wav': | ||||||||||||||||||||||||||||||||||||||||||||||||
| completion.headersRemaining = 1; | ||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||
| case 'mp3': | ||||||||||||||||||||||||||||||||||||||||||||||||
| completion.headersRemaining = 1; | ||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+282
to
+292
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't get the warning you mention, but this cast did introduce a bug for me. I made some tests using just Something like this fixed it for me:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||
| await new Promise<void>((resolve) => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| resultStream.on('data', (chunk: Buffer) => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (completion.headersRemaining > 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| completion.headersRemaining -= 1; | ||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (!completion.gotAudio) { | ||||||||||||||||||||||||||||||||||||||||||||||||
| completion.gotAudio = true; | ||||||||||||||||||||||||||||||||||||||||||||||||
| congestionController.audioRecvd(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| writableStream.write(chunk); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -272,9 +316,9 @@ async function audioStreamFromSentences( | |||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| writeAudio.on('error', onError); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| promiseStream.on('error', onError); | ||||||||||||||||||||||||||||||||||||||||||||||||
| audioChunkStream.on('error', onError); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| promiseStream.on('end', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| audioChunkStream.on('end', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
| setTimeout( | ||||||||||||||||||||||||||||||||||||||||||||||||
| () => | ||||||||||||||||||||||||||||||||||||||||||||||||
| writeAudio.on('finish', () => { | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
322
to
324
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So what is going on here:
The "new" bit is the Anyway, please let me know if any of that makes sense -- believe it or not, this is me trying to be as succinct as possible. I have tested these fixes before suggesting them here. PS.: I know 50ms is a very, very short window, but the race condition is certainly there. On-prem responses could theoretically be that fast (e.g. empty sentences), but, more than that, this number could change in the future (with some kind of exponential backoff implementation or whatnot) and by that time this kind of issue would be a lot of fun to debug :P
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EXCELLENT find. I really appreciate the thoroughness! I kind of felt something was off here - but couldn't find it. Thank you! I'll circle back to this PR in a bit after I'm done with the training cluster work. Thanks again! |
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -284,5 +328,5 @@ async function audioStreamFromSentences( | |||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| promiseStream.pipe(writeAudio); | ||||||||||||||||||||||||||||||||||||||||||||||||
| audioChunkStream.pipe(writeAudio); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| /** | ||
| * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. | ||
| */ | ||
| export type CongestionCtrl = | ||
| /** | ||
| * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. | ||
| */ | ||
| | 'Off' | ||
|
|
||
| /** | ||
| * The client will optimize for minimizing the number of physical resources required to handle a single stream. | ||
| * | ||
| * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. | ||
| */ | ||
| | 'StaticMar2024'; | ||
|
Comment on lines
+1
to
+15
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Edit: I had a better look and changed my mind: I think Feel free to leave it like this now, though. This is something we can refactor after this PR. |
||
|
|
||
| /** | ||
| * Responsible for optimizing the rate at which text is sent to the underlying API endpoint, according to the | ||
| * specified {@link CongestionCtrl} algorithm. {@link CongestionController} is essentially a task queue | ||
| * that throttles the parallelism of, and delay between, task execution. | ||
| * | ||
| * The primary motivation for this (as of 2024/02/28) is to protect PlayHT On-Prem appliances | ||
| * from being inundated with a burst of text-to-speech requests that it can't satisfy. Prior to the introduction | ||
| * of {@link CongestionController} (and more generally {@link CongestionCtrl}), the client would split | ||
| * a text stream into two text chunks (referred to as "sentences") and send them to the API client (i.e. gRPC client) | ||
| * simultaneously. This would routinely overload on-prem appliances that operate without a lot of GPU capacity headroom[1]. | ||
| * | ||
| * The result would be that most requests that clients sent would immediately result in a gRPC error 8: RESOURCE_EXHAUSTED; | ||
| * and therefore, a bad customer experience. {@link CongestionController}, if configured with "StaticMar2024", | ||
| * will now delay sending subsequent text chunks (i.e. sentences) to the gRPC client until audio for the preceding text | ||
| * chunk has started streaming. | ||
| * | ||
| * The current {@link CongestionCtrl} algorithm ("StaticMar2024") is very simple and leaves a lot to | ||
| * be desired. We should iterate on these algorithms. The {@link CongestionCtrl} enum was added so that algorithms | ||
| * can be added without requiring customers to change their code much. | ||
| * | ||
| * [1] Customers tend to be very cost sensitive regarding expensive GPU capacity, and therefore want to keep their appliances | ||
| * running near 100% utilization. | ||
| * | ||
| * --mtp@2024/02/28 | ||
| * | ||
| * This class is largely inert if the specified {@link CongestionCtrl} is "Off". | ||
| */ | ||
| export class CongestionController { | ||
| algo: CongestionCtrl; | ||
| taskQ: Array<Task> = []; | ||
| inflight = 0; | ||
| parallelism: number; | ||
| postChunkBackoff: number; | ||
|
|
||
| constructor(algo: CongestionCtrl) { | ||
| this.algo = algo; | ||
| switch (algo) { | ||
| case 'Off': | ||
| this.parallelism = Infinity; | ||
| this.postChunkBackoff = 0; | ||
| break; | ||
| case 'StaticMar2024': | ||
| this.parallelism = 1; | ||
| this.postChunkBackoff = 50; | ||
| break; | ||
| default: | ||
| throw new Error(`Unrecognized congestion control algorithm: ${algo}`); | ||
| } | ||
| } | ||
|
|
||
| enqueue(task: () => void, name: string) { | ||
| // if congestion control is turned off - just execute the task immediately | ||
| if (this.algo == 'Off') { | ||
| task(); | ||
| return; | ||
| } | ||
|
|
||
| this.taskQ.push(new Task(task, name)); | ||
| this.maybeDoMore(); | ||
| } | ||
|
|
||
| private maybeDoMore() { | ||
| // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately | ||
| if (this.algo == 'Off') return; | ||
|
|
||
| while (this.inflight < this.parallelism && this.taskQ.length > 0) { | ||
| const task = this.taskQ.shift()!; | ||
| this.inflight++; | ||
| //console.debug(`[PlayHT SDK] Started congestion control task: ${task.name}. inflight=${this.inflight}`); | ||
| task.fn(); | ||
| } | ||
| } | ||
|
|
||
| audioRecvd() { | ||
| // if congestion control is turned off - there's nothing to do here because all tasks were executed immediately | ||
| if (this.algo == 'Off') return; | ||
|
|
||
| this.inflight = Math.max(this.inflight - 1, 0); | ||
| //console.debug('[PlayHT SDK] Congestion control received audio'); | ||
| setTimeout(() => { | ||
| this.maybeDoMore(); | ||
| }, this.postChunkBackoff); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * NOTE: | ||
| * | ||
| * {@link #name} is currently unused, but exists so that we can log task names during development. | ||
| * Without {@link #name}, it's hard to understand which tasks were executed and in which order. | ||
| */ | ||
| class Task { | ||
| constructor( | ||
| public fn: () => void, | ||
| public name: string, | ||
| ) {} | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| /** | ||
| * Enumerates a streaming congestion control algorithms, used to optimize the rate at which text is sent to PlayHT. | ||
| */ | ||
| export type CongestionCtrl = | ||
| /** | ||
| * The client will not do any congestion control. Text will be sent to PlayHT as fast as possible. | ||
| */ | ||
| | 'Off' | ||
|
|
||
| /** | ||
| * The client will optimize for minimizing the number of physical resources required to handle a single stream. | ||
| * | ||
| * If you're using PlayHT On-Prem, you should use this {@link CongestionCtrl} algorithm. | ||
| */ | ||
| | 'StaticMar2024'; |

Uh oh!
There was an error while loading. Please reload this page.