Skip to content

Commit 68bd584

Browse files
test: add abort signal, multiple sessions, and body merging tests
Adds 3 additional test cases: - Abort signal gracefully closes the stream - Multiple independent chat sessions tracked correctly - ChatRequestOptions.body is merged into task payload Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent b6448fb commit 68bd584

File tree

1 file changed

+212
-0
lines changed

1 file changed

+212
-0
lines changed

packages/ai/src/transport.test.ts

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,218 @@ describe("TriggerChatTransport", () => {
479479
});
480480
});
481481

482+
describe("abort signal", () => {
483+
it("should close the stream gracefully when aborted", async () => {
484+
let streamResolve: (() => void) | undefined;
485+
const streamWait = new Promise<void>((resolve) => {
486+
streamResolve = resolve;
487+
});
488+
489+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
490+
const urlStr = typeof url === "string" ? url : url.toString();
491+
492+
if (urlStr.includes("/trigger")) {
493+
return new Response(
494+
JSON.stringify({ id: "run_abort" }),
495+
{
496+
status: 200,
497+
headers: {
498+
"content-type": "application/json",
499+
"x-trigger-jwt": "token",
500+
},
501+
}
502+
);
503+
}
504+
505+
if (urlStr.includes("/realtime/v1/streams/")) {
506+
// Create a slow stream that waits before sending data
507+
const stream = new ReadableStream<Uint8Array>({
508+
async start(controller) {
509+
const encoder = new TextEncoder();
510+
controller.enqueue(
511+
encoder.encode(`id: 0\ndata: ${JSON.stringify({ type: "text-start", id: "p1" })}\n\n`)
512+
);
513+
// Wait for the test to signal it's done
514+
await streamWait;
515+
controller.close();
516+
},
517+
});
518+
519+
return new Response(stream, {
520+
status: 200,
521+
headers: {
522+
"content-type": "text/event-stream",
523+
"X-Stream-Version": "v1",
524+
},
525+
});
526+
}
527+
528+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
529+
});
530+
531+
const abortController = new AbortController();
532+
533+
const transport = new TriggerChatTransport({
534+
taskId: "my-task",
535+
accessToken: "token",
536+
baseURL: "https://api.test.trigger.dev",
537+
});
538+
539+
const stream = await transport.sendMessages({
540+
trigger: "submit-message",
541+
chatId: "chat-abort",
542+
messageId: undefined,
543+
messages: [createUserMessage("test")],
544+
abortSignal: abortController.signal,
545+
});
546+
547+
// Read the first chunk
548+
const reader = stream.getReader();
549+
const first = await reader.read();
550+
expect(first.done).toBe(false);
551+
552+
// Abort and clean up
553+
abortController.abort();
554+
streamResolve?.();
555+
556+
// The stream should close — reading should return done
557+
const next = await reader.read();
558+
expect(next.done).toBe(true);
559+
});
560+
});
561+
562+
describe("multiple sessions", () => {
563+
it("should track multiple chat sessions independently", async () => {
564+
let callCount = 0;
565+
566+
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {
567+
const urlStr = typeof url === "string" ? url : url.toString();
568+
569+
if (urlStr.includes("/trigger")) {
570+
callCount++;
571+
return new Response(
572+
JSON.stringify({ id: `run_multi_${callCount}` }),
573+
{
574+
status: 200,
575+
headers: {
576+
"content-type": "application/json",
577+
"x-trigger-jwt": `token_${callCount}`,
578+
},
579+
}
580+
);
581+
}
582+
583+
if (urlStr.includes("/realtime/v1/streams/")) {
584+
return new Response(createSSEStream(""), {
585+
status: 200,
586+
headers: {
587+
"content-type": "text/event-stream",
588+
"X-Stream-Version": "v1",
589+
},
590+
});
591+
}
592+
593+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
594+
});
595+
596+
const transport = new TriggerChatTransport({
597+
taskId: "my-task",
598+
accessToken: "token",
599+
baseURL: "https://api.test.trigger.dev",
600+
});
601+
602+
// Start two independent chat sessions
603+
await transport.sendMessages({
604+
trigger: "submit-message",
605+
chatId: "session-a",
606+
messageId: undefined,
607+
messages: [createUserMessage("Hello A")],
608+
abortSignal: undefined,
609+
});
610+
611+
await transport.sendMessages({
612+
trigger: "submit-message",
613+
chatId: "session-b",
614+
messageId: undefined,
615+
messages: [createUserMessage("Hello B")],
616+
abortSignal: undefined,
617+
});
618+
619+
// Both sessions should be independently reconnectable
620+
const streamA = await transport.reconnectToStream({ chatId: "session-a" });
621+
const streamB = await transport.reconnectToStream({ chatId: "session-b" });
622+
const streamC = await transport.reconnectToStream({ chatId: "nonexistent" });
623+
624+
expect(streamA).toBeInstanceOf(ReadableStream);
625+
expect(streamB).toBeInstanceOf(ReadableStream);
626+
expect(streamC).toBeNull();
627+
});
628+
});
629+
630+
describe("body merging", () => {
631+
it("should merge ChatRequestOptions.body into the task payload", async () => {
632+
const fetchSpy = vi.fn().mockImplementation(async (url: string | URL) => {
633+
const urlStr = typeof url === "string" ? url : url.toString();
634+
635+
if (urlStr.includes("/trigger")) {
636+
return new Response(
637+
JSON.stringify({ id: "run_body" }),
638+
{
639+
status: 200,
640+
headers: {
641+
"content-type": "application/json",
642+
"x-trigger-jwt": "token",
643+
},
644+
}
645+
);
646+
}
647+
648+
if (urlStr.includes("/realtime/v1/streams/")) {
649+
return new Response(createSSEStream(""), {
650+
status: 200,
651+
headers: {
652+
"content-type": "text/event-stream",
653+
"X-Stream-Version": "v1",
654+
},
655+
});
656+
}
657+
658+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
659+
});
660+
661+
global.fetch = fetchSpy;
662+
663+
const transport = new TriggerChatTransport({
664+
taskId: "my-task",
665+
accessToken: "token",
666+
baseURL: "https://api.test.trigger.dev",
667+
});
668+
669+
await transport.sendMessages({
670+
trigger: "submit-message",
671+
chatId: "chat-body",
672+
messageId: undefined,
673+
messages: [createUserMessage("test")],
674+
abortSignal: undefined,
675+
body: { systemPrompt: "You are helpful", temperature: 0.7 },
676+
});
677+
678+
const triggerCall = fetchSpy.mock.calls.find((call: any[]) =>
679+
(typeof call[0] === "string" ? call[0] : call[0].toString()).includes("/trigger")
680+
);
681+
682+
const triggerBody = JSON.parse(triggerCall![1]?.body as string);
683+
const payload = JSON.parse(triggerBody.payload);
684+
685+
// body properties should be merged into the payload
686+
expect(payload.systemPrompt).toBe("You are helpful");
687+
expect(payload.temperature).toBe(0.7);
688+
// Standard fields should still be present
689+
expect(payload.chatId).toBe("chat-body");
690+
expect(payload.trigger).toBe("submit-message");
691+
});
692+
});
693+
482694
describe("message types", () => {
483695
it("should handle regenerate-message trigger", async () => {
484696
const fetchSpy = vi.fn().mockImplementation(async (url: string | URL) => {

0 commit comments

Comments
 (0)