Skip to content

Commit cc2be18

Browse files
Cover async run store implementations in chat transport tests
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent d706534 commit cc2be18

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import {
77
TriggerChatTransport,
88
} from "./chatTransport.js";
99
import type { UIMessage, UIMessageChunk } from "ai";
10+
import type {
11+
TriggerChatRunState,
12+
TriggerChatRunStore,
13+
} from "./types.js";
1014

1115
type TestServer = {
1216
url: string;
@@ -459,6 +463,69 @@ describe("TriggerChatTransport", function () {
459463
});
460464
});
461465

466+
it("supports async run store implementations", async function () {
467+
const runStore = new AsyncTrackedRunStore();
468+
469+
const server = await startServer(function (req, res) {
470+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
471+
res.writeHead(200, {
472+
"content-type": "application/json",
473+
"x-trigger-jwt": "pk_run_async",
474+
});
475+
res.end(JSON.stringify({ id: "run_async" }));
476+
return;
477+
}
478+
479+
if (req.method === "GET" && req.url === "/realtime/v1/streams/run_async/chat-stream") {
480+
res.writeHead(200, {
481+
"content-type": "text/event-stream",
482+
});
483+
writeSSE(
484+
res,
485+
"1-0",
486+
JSON.stringify({ type: "text-start", id: "async_1" })
487+
);
488+
writeSSE(
489+
res,
490+
"2-0",
491+
JSON.stringify({ type: "text-end", id: "async_1" })
492+
);
493+
res.end();
494+
return;
495+
}
496+
497+
res.writeHead(404);
498+
res.end();
499+
});
500+
501+
const transport = new TriggerChatTransport({
502+
task: "chat-task",
503+
stream: "chat-stream",
504+
accessToken: "pk_trigger",
505+
baseURL: server.url,
506+
runStore,
507+
});
508+
509+
const stream = await transport.sendMessages({
510+
trigger: "submit-message",
511+
chatId: "chat-async",
512+
messageId: undefined,
513+
messages: [],
514+
abortSignal: undefined,
515+
});
516+
517+
const chunks = await readChunks(stream);
518+
expect(chunks).toHaveLength(2);
519+
520+
await waitForCondition(function () {
521+
return runStore.deleteCalls.includes("chat-async");
522+
});
523+
524+
expect(runStore.setCalls).toContain("chat-async");
525+
expect(runStore.getCalls).toContain("chat-async");
526+
await expect(runStore.get("chat-async")).resolves.toBeUndefined();
527+
});
528+
462529
it("reconnects active streams using tracked lastEventId", async function () {
463530
let reconnectLastEventId: string | undefined;
464531
let firstStreamResponse: ServerResponse<IncomingMessage> | undefined;
@@ -661,3 +728,34 @@ class TrackedRunStore extends InMemoryTriggerChatRunStore {
661728
super.delete(chatId);
662729
}
663730
}
731+
732+
class AsyncTrackedRunStore implements TriggerChatRunStore {
733+
private readonly runs = new Map<string, TriggerChatRunState>();
734+
public readonly getCalls: string[] = [];
735+
public readonly setCalls: string[] = [];
736+
public readonly deleteCalls: string[] = [];
737+
738+
public async get(chatId: string): Promise<TriggerChatRunState | undefined> {
739+
this.getCalls.push(chatId);
740+
await sleep(1);
741+
return this.runs.get(chatId);
742+
}
743+
744+
public async set(state: TriggerChatRunState): Promise<void> {
745+
this.setCalls.push(state.chatId);
746+
await sleep(1);
747+
this.runs.set(state.chatId, state);
748+
}
749+
750+
public async delete(chatId: string): Promise<void> {
751+
this.deleteCalls.push(chatId);
752+
await sleep(1);
753+
this.runs.delete(chatId);
754+
}
755+
}
756+
757+
async function sleep(timeoutInMs: number) {
758+
await new Promise<void>(function (resolve) {
759+
setTimeout(resolve, timeoutInMs);
760+
});
761+
}

0 commit comments

Comments
 (0)