Skip to content

Commit 741c983

Browse files
feat: add @trigger.dev/ai package with TriggerChatTransport
New package that provides a custom AI SDK ChatTransport implementation bridging Vercel AI SDK's useChat hook with Trigger.dev's durable task execution and realtime streams. Key exports: - TriggerChatTransport class implementing ChatTransport<UIMessage> - createChatTransport() factory function - ChatTaskPayload type for task-side typing - TriggerChatTransportOptions type The transport triggers a Trigger.dev task with chat messages as payload, then subscribes to the task's realtime stream to receive UIMessageChunk data, which useChat processes natively. Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent b4e08bd commit 741c983

File tree

7 files changed

+633
-31
lines changed

7 files changed

+633
-31
lines changed

packages/ai/package.json

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
{
2+
"name": "@trigger.dev/ai",
3+
"version": "4.3.3",
4+
"description": "AI SDK integration for Trigger.dev - Custom ChatTransport for running AI chat as durable tasks",
5+
"license": "MIT",
6+
"publishConfig": {
7+
"access": "public"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "https://github.com/triggerdotdev/trigger.dev",
12+
"directory": "packages/ai"
13+
},
14+
"type": "module",
15+
"files": [
16+
"dist"
17+
],
18+
"tshy": {
19+
"selfLink": false,
20+
"main": true,
21+
"module": true,
22+
"project": "./tsconfig.json",
23+
"exports": {
24+
"./package.json": "./package.json",
25+
".": "./src/index.ts"
26+
},
27+
"sourceDialects": [
28+
"@triggerdotdev/source"
29+
]
30+
},
31+
"scripts": {
32+
"clean": "rimraf dist .tshy .tshy-build .turbo",
33+
"build": "tshy && pnpm run update-version",
34+
"dev": "tshy --watch",
35+
"typecheck": "tsc --noEmit",
36+
"test": "vitest",
37+
"update-version": "tsx ../../scripts/updateVersion.ts",
38+
"check-exports": "attw --pack ."
39+
},
40+
"dependencies": {
41+
"@trigger.dev/core": "workspace:4.3.3"
42+
},
43+
"peerDependencies": {
44+
"ai": "^5.0.0 || ^6.0.0"
45+
},
46+
"devDependencies": {
47+
"@arethetypeswrong/cli": "^0.15.4",
48+
"ai": "^6.0.0",
49+
"rimraf": "^3.0.2",
50+
"tshy": "^3.0.2",
51+
"tsx": "4.17.0",
52+
"vitest": "^2.1.0"
53+
},
54+
"engines": {
55+
"node": ">=18.20.0"
56+
},
57+
"exports": {
58+
"./package.json": "./package.json",
59+
".": {
60+
"import": {
61+
"@triggerdotdev/source": "./src/index.ts",
62+
"types": "./dist/esm/index.d.ts",
63+
"default": "./dist/esm/index.js"
64+
},
65+
"require": {
66+
"types": "./dist/commonjs/index.d.ts",
67+
"default": "./dist/commonjs/index.js"
68+
}
69+
}
70+
},
71+
"main": "./dist/commonjs/index.js",
72+
"types": "./dist/commonjs/index.d.ts",
73+
"module": "./dist/esm/index.js"
74+
}

packages/ai/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export { TriggerChatTransport, createChatTransport } from "./transport.js";
2+
export type { TriggerChatTransportOptions, ChatTaskPayload, ChatSessionState } from "./types.js";
3+
export { VERSION } from "./version.js";

packages/ai/src/transport.ts

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
import type { ChatTransport, UIMessage, UIMessageChunk, ChatRequestOptions } from "ai";
2+
import {
3+
ApiClient,
4+
SSEStreamSubscription,
5+
type SSEStreamPart,
6+
} from "@trigger.dev/core/v3";
7+
import type { TriggerChatTransportOptions, ChatSessionState } from "./types.js";
8+
9+
const DEFAULT_STREAM_KEY = "chat";
10+
const DEFAULT_BASE_URL = "https://api.trigger.dev";
11+
const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
12+
13+
/**
14+
* A custom AI SDK `ChatTransport` implementation that bridges the Vercel AI SDK's
15+
* `useChat` hook with Trigger.dev's durable task execution and realtime streams.
16+
*
17+
* When `sendMessages` is called, the transport:
18+
* 1. Triggers a Trigger.dev task with the chat messages as payload
19+
* 2. Subscribes to the task's realtime stream to receive `UIMessageChunk` data
20+
* 3. Returns a `ReadableStream<UIMessageChunk>` that the AI SDK processes natively
21+
*
22+
* @example
23+
* ```tsx
24+
* import { useChat } from "@ai-sdk/react";
25+
* import { TriggerChatTransport } from "@trigger.dev/ai";
26+
*
27+
* function Chat({ accessToken }: { accessToken: string }) {
28+
* const { messages, sendMessage, status } = useChat({
29+
* transport: new TriggerChatTransport({
30+
* accessToken,
31+
* taskId: "my-chat-task",
32+
* }),
33+
* });
34+
*
35+
* // ... render messages
36+
* }
37+
* ```
38+
*
39+
* On the backend, the task should pipe UIMessageChunks to the `"chat"` stream:
40+
*
41+
* @example
42+
* ```ts
43+
* import { task, streams } from "@trigger.dev/sdk";
44+
* import { streamText, convertToModelMessages } from "ai";
45+
*
46+
* export const myChatTask = task({
47+
* id: "my-chat-task",
48+
* run: async (payload: ChatTaskPayload) => {
49+
* const result = streamText({
50+
* model: openai("gpt-4o"),
51+
* messages: convertToModelMessages(payload.messages),
52+
* });
53+
*
54+
* const { waitUntilComplete } = streams.pipe("chat", result.toUIMessageStream());
55+
* await waitUntilComplete();
56+
* },
57+
* });
58+
* ```
59+
*/
60+
export class TriggerChatTransport implements ChatTransport<UIMessage> {
61+
private readonly taskId: string;
62+
private readonly accessToken: string;
63+
private readonly baseURL: string;
64+
private readonly streamKey: string;
65+
private readonly extraHeaders: Record<string, string>;
66+
67+
/**
68+
* Tracks active chat sessions for reconnection support.
69+
* Maps chatId → session state (runId, publicAccessToken).
70+
*/
71+
private sessions: Map<string, ChatSessionState> = new Map();
72+
73+
constructor(options: TriggerChatTransportOptions) {
74+
this.taskId = options.taskId;
75+
this.accessToken = options.accessToken;
76+
this.baseURL = options.baseURL ?? DEFAULT_BASE_URL;
77+
this.streamKey = options.streamKey ?? DEFAULT_STREAM_KEY;
78+
this.extraHeaders = options.headers ?? {};
79+
}
80+
81+
/**
82+
* Sends messages to a Trigger.dev task and returns a streaming response.
83+
*
84+
* This method:
85+
* 1. Triggers the configured task with the chat messages as payload
86+
* 2. Subscribes to the task's realtime stream for UIMessageChunk events
87+
* 3. Returns a ReadableStream that the AI SDK's useChat hook processes
88+
*/
89+
sendMessages = async (
90+
options: {
91+
trigger: "submit-message" | "regenerate-message";
92+
chatId: string;
93+
messageId: string | undefined;
94+
messages: UIMessage[];
95+
abortSignal: AbortSignal | undefined;
96+
} & ChatRequestOptions
97+
): Promise<ReadableStream<UIMessageChunk>> => {
98+
const { trigger, chatId, messageId, messages, abortSignal, headers, body, metadata } = options;
99+
100+
// Build the payload for the task
101+
const payload = {
102+
messages,
103+
chatId,
104+
trigger,
105+
messageId,
106+
metadata,
107+
...(body ?? {}),
108+
};
109+
110+
// Create API client for triggering
111+
const apiClient = new ApiClient(this.baseURL, this.accessToken);
112+
113+
// Trigger the task
114+
const triggerResponse = await apiClient.triggerTask(this.taskId, {
115+
payload: JSON.stringify(payload),
116+
options: {
117+
payloadType: "application/json",
118+
},
119+
});
120+
121+
const runId = triggerResponse.id;
122+
const publicAccessToken = "publicAccessToken" in triggerResponse
123+
? (triggerResponse as { publicAccessToken?: string }).publicAccessToken
124+
: undefined;
125+
126+
// Store session state for reconnection
127+
this.sessions.set(chatId, {
128+
runId,
129+
publicAccessToken: publicAccessToken ?? this.accessToken,
130+
});
131+
132+
// Subscribe to the realtime stream for this run
133+
return this.subscribeToStream(runId, publicAccessToken ?? this.accessToken, abortSignal);
134+
};
135+
136+
/**
137+
* Reconnects to an existing streaming response for the specified chat session.
138+
*
139+
* Returns a ReadableStream if an active session exists, or null if no session is found.
140+
*/
141+
reconnectToStream = async (
142+
options: {
143+
chatId: string;
144+
} & ChatRequestOptions
145+
): Promise<ReadableStream<UIMessageChunk> | null> => {
146+
const { chatId } = options;
147+
148+
const session = this.sessions.get(chatId);
149+
if (!session) {
150+
return null;
151+
}
152+
153+
return this.subscribeToStream(session.runId, session.publicAccessToken, undefined);
154+
};
155+
156+
/**
157+
* Creates a ReadableStream<UIMessageChunk> by subscribing to the realtime SSE stream
158+
* for a given run.
159+
*/
160+
private subscribeToStream(
161+
runId: string,
162+
accessToken: string,
163+
abortSignal: AbortSignal | undefined
164+
): ReadableStream<UIMessageChunk> {
165+
const streamKey = this.streamKey;
166+
const baseURL = this.baseURL;
167+
const extraHeaders = this.extraHeaders;
168+
169+
// Build the authorization header
170+
const headers: Record<string, string> = {
171+
Authorization: `Bearer ${accessToken}`,
172+
...extraHeaders,
173+
};
174+
175+
const subscription = new SSEStreamSubscription(
176+
`${baseURL}/realtime/v1/streams/${runId}/${streamKey}`,
177+
{
178+
headers,
179+
signal: abortSignal,
180+
timeoutInSeconds: DEFAULT_STREAM_TIMEOUT_SECONDS,
181+
}
182+
);
183+
184+
// We need to convert the SSEStreamPart stream to a UIMessageChunk stream
185+
// SSEStreamPart has { id, chunk, timestamp } where chunk is the deserialized UIMessageChunk
186+
let sseStreamPromise: Promise<ReadableStream<SSEStreamPart>> | null = null;
187+
188+
return new ReadableStream<UIMessageChunk>({
189+
start: async (controller) => {
190+
try {
191+
sseStreamPromise = subscription.subscribe();
192+
const sseStream = await sseStreamPromise;
193+
const reader = sseStream.getReader();
194+
195+
try {
196+
while (true) {
197+
const { done, value } = await reader.read();
198+
199+
if (done) {
200+
controller.close();
201+
return;
202+
}
203+
204+
if (abortSignal?.aborted) {
205+
reader.cancel();
206+
reader.releaseLock();
207+
controller.close();
208+
return;
209+
}
210+
211+
// Each SSE part's chunk is a UIMessageChunk
212+
controller.enqueue(value.chunk as UIMessageChunk);
213+
}
214+
} catch (readError) {
215+
reader.releaseLock();
216+
throw readError;
217+
}
218+
} catch (error) {
219+
// Don't error the stream for abort errors
220+
if (error instanceof Error && error.name === "AbortError") {
221+
controller.close();
222+
return;
223+
}
224+
225+
controller.error(error);
226+
}
227+
},
228+
cancel: () => {
229+
// Cancellation is handled by the abort signal
230+
},
231+
});
232+
}
233+
}
234+
235+
/**
236+
* Creates a new `TriggerChatTransport` instance.
237+
*
238+
* This is a convenience factory function equivalent to `new TriggerChatTransport(options)`.
239+
*
240+
* @example
241+
* ```tsx
242+
* import { useChat } from "@ai-sdk/react";
243+
* import { createChatTransport } from "@trigger.dev/ai";
244+
*
245+
* const transport = createChatTransport({
246+
* taskId: "my-chat-task",
247+
* accessToken: publicAccessToken,
248+
* });
249+
*
250+
* function Chat() {
251+
* const { messages, sendMessage } = useChat({ transport });
252+
* // ...
253+
* }
254+
* ```
255+
*/
256+
export function createChatTransport(options: TriggerChatTransportOptions): TriggerChatTransport {
257+
return new TriggerChatTransport(options);
258+
}

0 commit comments

Comments
 (0)