Skip to content

Commit 1f49bc2

Browse files
Add optional onError callback for non-fatal transport issues
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent 5efba13 commit 1f49bc2

File tree

7 files changed

+149
-2
lines changed

7 files changed

+149
-2
lines changed

docs/tasks/streams.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ The default payload sent to your task is a rich, typed object that includes:
614614
- `triggerOptions` as an object or resolver function (sync or async)
615615
- `runStore` for custom reconnect-state persistence (including async stores)
616616
- `onTriggeredRun` callback (sync or async) to persist or observe run IDs
617+
- `onError` callback to observe non-fatal transport issues
617618
- headers passed through transport can be object, `Headers`, or tuple arrays
618619

619620
```ts

packages/ai/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Typed request option helper aliases are exported:
6060
- `TriggerChatSendMessagesOptions`
6161
- `TriggerChatReconnectOptions`
6262
- `TriggerChatHeadersInput`
63+
- `TriggerChatTransportError` / `TriggerChatOnError`
6364

6465
```ts
6566
import type { TriggerChatTransportPayload } from "@trigger.dev/ai";
@@ -128,6 +129,9 @@ class MemoryStore implements TriggerChatRunStore {
128129
`onTriggeredRun` can also be async, which is useful for persisting run IDs before
129130
the chat stream is consumed. Callback failures are ignored so chat streaming can continue.
130131

132+
You can optionally provide `onError` to observe non-fatal transport errors
133+
(for example callback failures or reconnect setup issues).
134+
131135
## Reconnect semantics
132136

133137
- `reconnectToStream({ chatId })` resumes only while a stream is still active.

packages/ai/src/chatTransport.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
import type { TriggerChatStream } from "./types.js";
1010
import type { UIMessage, UIMessageChunk } from "ai";
1111
import type {
12+
TriggerChatTransportError,
1213
TriggerChatRunState,
1314
TriggerChatRunStore,
1415
} from "./types.js";
@@ -864,6 +865,7 @@ describe("TriggerChatTransport", function () {
864865

865866
it("continues streaming when onTriggeredRun callback throws", async function () {
866867
let callbackCalled = false;
868+
const errors: TriggerChatTransportError[] = [];
867869

868870
const server = await startServer(function (req, res) {
869871
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
@@ -909,6 +911,9 @@ describe("TriggerChatTransport", function () {
909911
callbackCalled = true;
910912
throw new Error("callback failed");
911913
},
914+
onError: function onError(error) {
915+
errors.push(error);
916+
},
912917
});
913918

914919
const stream = await transport.sendMessages({
@@ -922,6 +927,71 @@ describe("TriggerChatTransport", function () {
922927
const chunks = await readChunks(stream);
923928
expect(callbackCalled).toBe(true);
924929
expect(chunks).toHaveLength(2);
930+
expect(errors).toHaveLength(1);
931+
expect(errors[0]).toMatchObject({
932+
phase: "onTriggeredRun",
933+
chatId: "chat-callback-error",
934+
runId: "run_callback_error",
935+
});
936+
expect(errors[0]?.error.message).toBe("callback failed");
937+
});
938+
939+
it("ignores failures from onError callback", async function () {
940+
const server = await startServer(function (req, res) {
941+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
942+
res.writeHead(200, {
943+
"content-type": "application/json",
944+
"x-trigger-jwt": "pk_run_onerror_fail",
945+
});
946+
res.end(JSON.stringify({ id: "run_onerror_fail" }));
947+
return;
948+
}
949+
950+
if (req.method === "GET" && req.url === "/realtime/v1/streams/run_onerror_fail/chat-stream") {
951+
res.writeHead(200, {
952+
"content-type": "text/event-stream",
953+
});
954+
writeSSE(
955+
res,
956+
"1-0",
957+
JSON.stringify({ type: "text-start", id: "onerror_fail_1" })
958+
);
959+
writeSSE(
960+
res,
961+
"2-0",
962+
JSON.stringify({ type: "text-end", id: "onerror_fail_1" })
963+
);
964+
res.end();
965+
return;
966+
}
967+
968+
res.writeHead(404);
969+
res.end();
970+
});
971+
972+
const transport = new TriggerChatTransport({
973+
task: "chat-task",
974+
stream: "chat-stream",
975+
accessToken: "pk_trigger",
976+
baseURL: server.url,
977+
onTriggeredRun: async function onTriggeredRun() {
978+
throw new Error("callback failed");
979+
},
980+
onError: async function onError() {
981+
throw new Error("onError failed");
982+
},
983+
});
984+
985+
const stream = await transport.sendMessages({
986+
trigger: "submit-message",
987+
chatId: "chat-onerror-fail",
988+
messageId: undefined,
989+
messages: [],
990+
abortSignal: undefined,
991+
});
992+
993+
const chunks = await readChunks(stream);
994+
expect(chunks).toHaveLength(2);
925995
});
926996

927997
it("cleans run store state when stream completes", async function () {

packages/ai/src/chatTransport.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import type {
1515
} from "ai";
1616
import type {
1717
TriggerChatHeadersInput,
18+
TriggerChatOnError,
1819
TriggerChatReconnectOptions,
1920
TriggerChatSendMessagesOptions,
2021
TriggerChatOnTriggeredRun,
@@ -79,6 +80,7 @@ type TriggerChatTransportCommonOptions<
7980
| TriggerChatTriggerOptionsResolver<UI_MESSAGE>;
8081
runStore?: TriggerChatRunStore;
8182
onTriggeredRun?: TriggerChatOnTriggeredRun;
83+
onError?: TriggerChatOnError;
8284
};
8385

8486
type TriggerChatTransportMapperRequirement<
@@ -133,6 +135,7 @@ export class TriggerChatTransport<
133135
private readonly previewBranch: string | undefined;
134136
private readonly requestOptions: ApiRequestOptions | undefined;
135137
private readonly onTriggeredRun: TriggerChatOnTriggeredRun | undefined;
138+
private readonly onError: TriggerChatOnError | undefined;
136139

137140
constructor(options: TriggerChatTransportOptions<UI_MESSAGE, PAYLOAD>) {
138141
this.task = options.task;
@@ -151,6 +154,7 @@ export class TriggerChatTransport<
151154
this.requestOptions
152155
);
153156
this.onTriggeredRun = options.onTriggeredRun;
157+
this.onError = options.onError;
154158
}
155159

156160
public async sendMessages(
@@ -180,7 +184,13 @@ export class TriggerChatTransport<
180184
await this.onTriggeredRun({
181185
...runState,
182186
});
183-
} catch {
187+
} catch (error) {
188+
await this.reportError({
189+
phase: "onTriggeredRun",
190+
chatId: runState.chatId,
191+
runId: runState.runId,
192+
error: normalizeError(error),
193+
});
184194
// Ignore callback errors so chat streaming can continue.
185195
}
186196
}
@@ -207,10 +217,16 @@ export class TriggerChatTransport<
207217
let stream: ReadableStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>>;
208218
try {
209219
stream = await this.fetchRunStream(runState, undefined, runState.lastEventId);
210-
} catch {
220+
} catch (error) {
211221
runState.isActive = false;
212222
await this.runStore.set(runState);
213223
await this.runStore.delete(options.chatId);
224+
await this.reportError({
225+
phase: "reconnect",
226+
chatId: runState.chatId,
227+
runId: runState.runId,
228+
error: normalizeError(error),
229+
});
214230
return null;
215231
}
216232

@@ -291,6 +307,12 @@ export class TriggerChatTransport<
291307
runState.isActive = false;
292308
await this.runStore.set(runState);
293309
await this.runStore.delete(chatId);
310+
await this.reportError({
311+
phase: "consumeTrackingStream",
312+
chatId: runState.chatId,
313+
runId: runState.runId,
314+
error: new Error("Stream tracking failed"),
315+
});
294316
}
295317
}
296318
}
@@ -314,6 +336,25 @@ export class TriggerChatTransport<
314336

315337
return `${normalizedBaseUrl}/realtime/v1/streams/${encodedRunId}/${encodedStreamKey}`;
316338
}
339+
340+
private async reportError(
341+
event: {
342+
phase: "onTriggeredRun" | "consumeTrackingStream" | "reconnect";
343+
chatId: string;
344+
runId: string;
345+
error: Error;
346+
}
347+
) {
348+
if (!this.onError) {
349+
return;
350+
}
351+
352+
try {
353+
await this.onError(event);
354+
} catch {
355+
// Never let error callbacks interfere with transport behavior.
356+
}
357+
}
317358
}
318359

319360
export function createTriggerChatTransport<
@@ -469,3 +510,11 @@ async function createTriggerTaskOptions(
469510
}
470511

471512
export type { TriggerChatTaskContext };
513+
514+
function normalizeError(error: unknown): Error {
515+
if (error instanceof Error) {
516+
return error;
517+
}
518+
519+
return new Error(String(error));
520+
}

packages/ai/src/chatTransport.types.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
createTriggerChatTransport,
55
TriggerChatTransport,
66
TriggerChatTransportOptions,
7+
type TriggerChatTransportError,
78
type TriggerChatHeadersInput,
89
type TriggerChatReconnectOptions,
910
type TriggerChatSendMessagesOptions,
@@ -88,6 +89,9 @@ it("accepts async payload mappers and trigger option resolvers", function () {
8889
onTriggeredRun: async function onTriggeredRun(_state) {
8990
return;
9091
},
92+
onError: async function onError(_error: TriggerChatTransportError) {
93+
return;
94+
},
9195
};
9296

9397
expectTypeOf(options).toBeObject();

packages/ai/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export {
77
} from "./chatTransport.js";
88
export type {
99
TriggerChatHeadersInput,
10+
TriggerChatOnError,
1011
TriggerChatPayloadMapper,
1112
TriggerChatOnTriggeredRun,
1213
TriggerChatReconnectOptions,
@@ -15,6 +16,8 @@ export type {
1516
TriggerChatSendMessagesOptions,
1617
TriggerChatStream,
1718
TriggerChatTaskContext,
19+
TriggerChatTransportError,
20+
TriggerChatTransportErrorPhase,
1821
TriggerChatTransportPayload,
1922
TriggerChatTransportRequest,
2023
TriggerChatTransportTrigger,

packages/ai/src/types.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,22 @@ export type TriggerChatOnTriggeredRun = (
7070
state: TriggerChatRunState
7171
) => MaybePromise<void>;
7272

73+
export type TriggerChatTransportErrorPhase =
74+
| "onTriggeredRun"
75+
| "consumeTrackingStream"
76+
| "reconnect";
77+
78+
export type TriggerChatTransportError = {
79+
phase: TriggerChatTransportErrorPhase;
80+
chatId: string;
81+
runId: string;
82+
error: Error;
83+
};
84+
85+
export type TriggerChatOnError = (
86+
error: TriggerChatTransportError
87+
) => MaybePromise<void>;
88+
7389
export type TriggerChatStream<
7490
UI_MESSAGE extends UIMessage = UIMessage,
7591
> =

0 commit comments

Comments
 (0)