Skip to content

Commit b6448fb

Browse files
refactor: polish TriggerChatTransport implementation
- Cache ApiClient instance instead of creating per-call - Add streamTimeoutSeconds option for customizable stream timeout - Clean up subscribeToStream method (remove unused variable) - Improve JSDoc with backend task example - Minor code cleanup Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent 8354e2a commit b6448fb

File tree

2 files changed

+35
-32
lines changed

2 files changed

+35
-32
lines changed

packages/ai/src/transport.ts

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@ const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
1919
* 2. Subscribes to the task's realtime stream to receive `UIMessageChunk` data
2020
* 3. Returns a `ReadableStream<UIMessageChunk>` that the AI SDK processes natively
2121
*
22+
* The task receives a `ChatTaskPayload` containing the conversation messages,
23+
* chat session ID, trigger type, and any custom metadata. Your task should use
24+
* the AI SDK's `streamText` (or similar) to generate a response, then pipe
25+
* the resulting `UIMessageStream` to the `"chat"` realtime stream key
26+
* (or a custom key matching the `streamKey` option).
27+
*
2228
* @example
2329
* ```tsx
30+
* // Frontend — use with AI SDK's useChat hook
2431
* import { useChat } from "@ai-sdk/react";
2532
* import { TriggerChatTransport } from "@trigger.dev/ai";
2633
*
@@ -36,12 +43,12 @@ const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
3643
* }
3744
* ```
3845
*
39-
* On the backend, the task should pipe UIMessageChunks to the `"chat"` stream:
40-
*
4146
* @example
4247
* ```ts
48+
* // Backend — Trigger.dev task that handles chat
4349
* import { task, streams } from "@trigger.dev/sdk";
4450
* import { streamText, convertToModelMessages } from "ai";
51+
* import type { ChatTaskPayload } from "@trigger.dev/ai";
4552
*
4653
* export const myChatTask = task({
4754
* id: "my-chat-task",
@@ -63,6 +70,8 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
6370
private readonly baseURL: string;
6471
private readonly streamKey: string;
6572
private readonly extraHeaders: Record<string, string>;
73+
private readonly streamTimeoutSeconds: number;
74+
private readonly apiClient: ApiClient;
6675

6776
/**
6877
* Tracks active chat sessions for reconnection support.
@@ -76,6 +85,8 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
7685
this.baseURL = options.baseURL ?? DEFAULT_BASE_URL;
7786
this.streamKey = options.streamKey ?? DEFAULT_STREAM_KEY;
7887
this.extraHeaders = options.headers ?? {};
88+
this.streamTimeoutSeconds = options.streamTimeoutSeconds ?? DEFAULT_STREAM_TIMEOUT_SECONDS;
89+
this.apiClient = new ApiClient(this.baseURL, this.accessToken);
7990
}
8091

8192
/**
@@ -95,9 +106,9 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
95106
abortSignal: AbortSignal | undefined;
96107
} & ChatRequestOptions
97108
): Promise<ReadableStream<UIMessageChunk>> => {
98-
const { trigger, chatId, messageId, messages, abortSignal, headers, body, metadata } = options;
109+
const { trigger, chatId, messageId, messages, abortSignal, body, metadata } = options;
99110

100-
// Build the payload for the task
111+
// Build the payload for the task — this becomes the ChatTaskPayload
101112
const payload = {
102113
messages,
103114
chatId,
@@ -107,21 +118,19 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
107118
...(body ?? {}),
108119
};
109120

110-
// Create API client for triggering
111-
const apiClient = new ApiClient(this.baseURL, this.accessToken);
112-
113121
// Trigger the task
114-
const triggerResponse = await apiClient.triggerTask(this.taskId, {
122+
const triggerResponse = await this.apiClient.triggerTask(this.taskId, {
115123
payload: JSON.stringify(payload),
116124
options: {
117125
payloadType: "application/json",
118126
},
119127
});
120128

121129
const runId = triggerResponse.id;
122-
const publicAccessToken = "publicAccessToken" in triggerResponse
123-
? (triggerResponse as { publicAccessToken?: string }).publicAccessToken
124-
: undefined;
130+
const publicAccessToken =
131+
"publicAccessToken" in triggerResponse
132+
? (triggerResponse as { publicAccessToken?: string }).publicAccessToken
133+
: undefined;
125134

126135
// Store session state for reconnection
127136
this.sessions.set(chatId, {
@@ -143,9 +152,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
143152
chatId: string;
144153
} & ChatRequestOptions
145154
): Promise<ReadableStream<UIMessageChunk> | null> => {
146-
const { chatId } = options;
147-
148-
const session = this.sessions.get(chatId);
155+
const session = this.sessions.get(options.chatId);
149156
if (!session) {
150157
return null;
151158
}
@@ -162,34 +169,24 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
162169
accessToken: string,
163170
abortSignal: AbortSignal | undefined
164171
): ReadableStream<UIMessageChunk> {
165-
const streamKey = this.streamKey;
166-
const baseURL = this.baseURL;
167-
const extraHeaders = this.extraHeaders;
168-
169-
// Build the authorization header
170172
const headers: Record<string, string> = {
171173
Authorization: `Bearer ${accessToken}`,
172-
...extraHeaders,
174+
...this.extraHeaders,
173175
};
174176

175177
const subscription = new SSEStreamSubscription(
176-
`${baseURL}/realtime/v1/streams/${runId}/${streamKey}`,
178+
`${this.baseURL}/realtime/v1/streams/${runId}/${this.streamKey}`,
177179
{
178180
headers,
179181
signal: abortSignal,
180-
timeoutInSeconds: DEFAULT_STREAM_TIMEOUT_SECONDS,
182+
timeoutInSeconds: this.streamTimeoutSeconds,
181183
}
182184
);
183185

184-
// We need to convert the SSEStreamPart stream to a UIMessageChunk stream
185-
// SSEStreamPart has { id, chunk, timestamp } where chunk is the deserialized UIMessageChunk
186-
let sseStreamPromise: Promise<ReadableStream<SSEStreamPart>> | null = null;
187-
188186
return new ReadableStream<UIMessageChunk>({
189187
start: async (controller) => {
190188
try {
191-
sseStreamPromise = subscription.subscribe();
192-
const sseStream = await sseStreamPromise;
189+
const sseStream = await subscription.subscribe();
193190
const reader = sseStream.getReader();
194191

195192
try {
@@ -216,7 +213,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
216213
throw readError;
217214
}
218215
} catch (error) {
219-
// Don't error the stream for abort errors
216+
// Don't error the stream for abort errors — just close gracefully
220217
if (error instanceof Error && error.name === "AbortError") {
221218
controller.close();
222219
return;
@@ -225,9 +222,6 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
225222
controller.error(error);
226223
}
227224
},
228-
cancel: () => {
229-
// Cancellation is handled by the abort signal
230-
},
231225
});
232226
}
233227
}

packages/ai/src/types.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ export type TriggerChatTransportOptions = {
3939
* Additional headers to include in API requests to Trigger.dev.
4040
*/
4141
headers?: Record<string, string>;
42+
43+
/**
44+
* The number of seconds to wait for the realtime stream to produce data
45+
* before timing out. If no data arrives within this period, the stream
46+
* will be closed.
47+
*
48+
* @default 120
49+
*/
50+
streamTimeoutSeconds?: number;
4251
};
4352

4453
/**

0 commit comments

Comments
 (0)