Skip to content

Commit 58b43ec

Browse files
Track exact SSE event IDs for chat reconnect
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent b30a66b commit 58b43ec

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ describe("TriggerChatTransport", function () {
419419
}
420420

421421
const state = runStore.get("chat-2");
422-
return Boolean(state && state.lastEventId === "0");
422+
return Boolean(state && state.lastEventId === "1-0");
423423
});
424424

425425
const reconnectStream = await transport.reconnectToStream({
@@ -429,7 +429,7 @@ describe("TriggerChatTransport", function () {
429429
expect(reconnectStream).not.toBeNull();
430430

431431
const reconnectChunks = await readChunks(reconnectStream!);
432-
expect(reconnectLastEventId).toBe("0");
432+
expect(reconnectLastEventId).toBe("1-0");
433433
expect(reconnectChunks).toHaveLength(2);
434434
expect(reconnectChunks[0]).toMatchObject({
435435
chunk: { type: "text-delta", id: "msg_2", delta: "world" },

packages/ai/src/chatTransport.ts

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import {
22
ApiClient,
33
ApiRequestOptions,
44
makeIdempotencyKey,
5+
SSEStreamPart,
6+
SSEStreamSubscription,
57
stringifyIO,
68
TriggerOptions,
79
} from "@trigger.dev/core/v3";
@@ -205,47 +207,61 @@ export class TriggerChatTransport<
205207
runState: TriggerChatRunState,
206208
abortSignal: AbortSignal | undefined,
207209
lastEventId?: string
208-
): Promise<ReadableStream<UIMessageChunk>> {
210+
): Promise<ReadableStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>>> {
209211
const streamClient = new ApiClient(
210212
this.baseURL,
211213
runState.publicAccessToken,
212214
this.previewBranch,
213215
this.requestOptions
214216
);
215217

216-
const stream = await streamClient.fetchStream<InferUIMessageChunk<UI_MESSAGE>>(
217-
runState.runId,
218-
runState.streamKey,
218+
const subscription = new SSEStreamSubscription(
219+
this.createStreamUrl(runState.runId, runState.streamKey),
219220
{
221+
headers: streamClient.getHeaders(),
220222
signal: abortSignal,
221223
timeoutInSeconds: this.timeoutInSeconds,
222224
lastEventId,
223225
}
224226
);
225227

226-
return stream as unknown as ReadableStream<UIMessageChunk>;
228+
return (await subscription.subscribe()) as ReadableStream<
229+
SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>
230+
>;
227231
}
228232

229-
private createTrackedStream(chatId: string, stream: ReadableStream<UIMessageChunk>) {
233+
private createTrackedStream(
234+
chatId: string,
235+
stream: ReadableStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>>
236+
) {
230237
const teeStreams = stream.tee();
231238
const trackingStream = teeStreams[0];
232239
const consumerStream = teeStreams[1];
233240

234241
this.consumeTrackingStream(chatId, trackingStream);
235242

236-
return consumerStream;
243+
return consumerStream.pipeThrough(
244+
new TransformStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>, UIMessageChunk>({
245+
transform(part, controller) {
246+
controller.enqueue(part.chunk as UIMessageChunk);
247+
},
248+
})
249+
);
237250
}
238251

239-
private async consumeTrackingStream(chatId: string, stream: ReadableStream<UIMessageChunk>) {
252+
private async consumeTrackingStream(
253+
chatId: string,
254+
stream: ReadableStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>>
255+
) {
240256
try {
241-
for await (const _chunk of stream) {
257+
for await (const part of stream) {
242258
const runState = await this.runStore.get(chatId);
243259

244260
if (!runState) {
245261
return;
246262
}
247263

248-
runState.lastEventId = incrementLastEventId(runState.lastEventId);
264+
runState.lastEventId = part.id;
249265
await this.runStore.set(runState);
250266
}
251267

@@ -274,6 +290,14 @@ export class TriggerChatTransport<
274290

275291
return handle as TriggerTaskResponse;
276292
}
293+
294+
private createStreamUrl(runId: string, streamKey: string): string {
295+
const normalizedBaseUrl = this.baseURL.replace(/\/$/, "");
296+
const encodedRunId = encodeURIComponent(runId);
297+
const encodedStreamKey = encodeURIComponent(streamKey);
298+
299+
return `${normalizedBaseUrl}/realtime/v1/streams/${encodedRunId}/${encodedStreamKey}`;
300+
}
277301
}
278302

279303
export function createTriggerChatTransport<
@@ -426,17 +450,4 @@ async function createTriggerTaskOptions(
426450
};
427451
}
428452

429-
function incrementLastEventId(lastEventId: string | undefined): string {
430-
if (!lastEventId) {
431-
return "0";
432-
}
433-
434-
const numberValue = Number.parseInt(lastEventId, 10);
435-
if (Number.isNaN(numberValue)) {
436-
return "0";
437-
}
438-
439-
return String(numberValue + 1);
440-
}
441-
442453
export type { TriggerChatTaskContext };

0 commit comments

Comments
 (0)