Skip to content

Commit b511566

Browse files
Cleanup run-store entries when chat streams finish
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent cf1b5c7 commit b511566

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,67 @@ describe("TriggerChatTransport", function () {
338338
expect(observedRunId).toBe("run_factory");
339339
});
340340

341+
it("cleans run store state when stream completes", async function () {
342+
const trackedRunStore = new TrackedRunStore();
343+
344+
const server = await startServer(function (req, res) {
345+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
346+
res.writeHead(200, {
347+
"content-type": "application/json",
348+
"x-trigger-jwt": "pk_run_cleanup",
349+
});
350+
res.end(JSON.stringify({ id: "run_cleanup" }));
351+
return;
352+
}
353+
354+
if (req.method === "GET" && req.url === "/realtime/v1/streams/run_cleanup/chat-stream") {
355+
res.writeHead(200, {
356+
"content-type": "text/event-stream",
357+
});
358+
writeSSE(
359+
res,
360+
"1-0",
361+
JSON.stringify({ type: "text-start", id: "cleanup_1" })
362+
);
363+
writeSSE(
364+
res,
365+
"2-0",
366+
JSON.stringify({ type: "text-end", id: "cleanup_1" })
367+
);
368+
res.end();
369+
return;
370+
}
371+
372+
res.writeHead(404);
373+
res.end();
374+
});
375+
376+
const transport = new TriggerChatTransport({
377+
task: "chat-task",
378+
stream: "chat-stream",
379+
accessToken: "pk_trigger",
380+
baseURL: server.url,
381+
runStore: trackedRunStore,
382+
});
383+
384+
const stream = await transport.sendMessages({
385+
trigger: "submit-message",
386+
chatId: "chat-cleanup",
387+
messageId: undefined,
388+
messages: [],
389+
abortSignal: undefined,
390+
});
391+
392+
const chunks = await readChunks(stream);
393+
expect(chunks).toHaveLength(2);
394+
395+
await waitForCondition(function () {
396+
return trackedRunStore.deleteCalls.includes("chat-cleanup");
397+
});
398+
399+
expect(trackedRunStore.get("chat-cleanup")).toBeUndefined();
400+
});
401+
341402
it("reconnects active streams using tracked lastEventId", async function () {
342403
let reconnectLastEventId: string | undefined;
343404
let firstStreamResponse: ServerResponse<IncomingMessage> | undefined;
@@ -528,3 +589,12 @@ async function waitForCondition(condition: () => boolean, timeoutInMs = 5000) {
528589

529590
throw new Error(`Condition was not met within ${timeoutInMs}ms`);
530591
}
592+
593+
class TrackedRunStore extends InMemoryTriggerChatRunStore {
594+
public readonly deleteCalls: string[] = [];
595+
596+
public delete(chatId: string): void {
597+
this.deleteCalls.push(chatId);
598+
super.delete(chatId);
599+
}
600+
}

packages/ai/src/chatTransport.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,14 @@ export class TriggerChatTransport<
269269
if (runState) {
270270
runState.isActive = false;
271271
await this.runStore.set(runState);
272+
await this.runStore.delete(chatId);
272273
}
273274
} catch {
274275
const runState = await this.runStore.get(chatId);
275276
if (runState) {
276277
runState.isActive = false;
277278
await this.runStore.set(runState);
279+
await this.runStore.delete(chatId);
278280
}
279281
}
280282
}

0 commit comments

Comments
 (0)