Skip to content

Commit 4b4953c

Browse files
Report stream subscribe failures through onError lifecycle
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent 8e0dcd4 commit 4b4953c

File tree

7 files changed

+119
-5
lines changed

7 files changed

+119
-5
lines changed

docs/tasks/streams.mdx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,8 +618,9 @@ The default payload sent to your task is a rich, typed object that includes:
618618
- headers passed through transport can be object, `Headers`, or tuple arrays
619619

620620
`onError` receives phase-aware details (`payloadMapper`, `triggerOptions`, `triggerTask`,
621-
`onTriggeredRun`, `consumeTrackingStream`, `reconnect`) plus `chatId`, optional `runId`,
622-
and the underlying `error` (non-Error throws are normalized to `Error` instances).
621+
`streamSubscribe`, `onTriggeredRun`, `consumeTrackingStream`, `reconnect`) plus `chatId`,
622+
optional `runId`, and the underlying `error` (non-Error throws are normalized to `Error`
623+
instances).
623624

624625
```ts
625626
import type { TriggerChatRunState, TriggerChatRunStore } from "@trigger.dev/ai";

packages/ai/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@
1313
- Added reconnect lifecycle handling that cleans run state after completion/error and gracefully returns `null` when reconnect cannot be resumed.
1414
- Added explicit helper option types for chat send/reconnect request inputs.
1515
- Added optional `onError` callback support for observing non-fatal transport issues.
16-
- Added phase-aware `onError` reporting across send, reconnect, and stream-consumption paths.
16+
- Added phase-aware `onError` reporting across send, stream-subscribe, reconnect, and stream-consumption paths.
1717
- Added normalization of non-Error throw values into Error instances before `onError` reporting.

packages/ai/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ You can optionally provide `onError` to observe non-fatal transport errors
135135

136136
The callback receives:
137137

138-
- `phase`: `"payloadMapper" | "triggerOptions" | "triggerTask" | "onTriggeredRun" | "consumeTrackingStream" | "reconnect"`
138+
- `phase`: `"payloadMapper" | "triggerOptions" | "triggerTask" | "streamSubscribe" | "onTriggeredRun" | "consumeTrackingStream" | "reconnect"`
139139
- `chatId`
140140
- `runId` (may be `undefined` before a run is created)
141141
- `error`

packages/ai/src/chatTransport.test.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,103 @@ describe("TriggerChatTransport", function () {
10741074
).rejects.toThrowError("trigger task failed root");
10751075
});
10761076

1077+
it("reports stream subscription failures through onError", async function () {
1078+
const errors: TriggerChatTransportError[] = [];
1079+
const runStore = new InMemoryTriggerChatRunStore();
1080+
1081+
const server = await startServer(function (req, res) {
1082+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1083+
res.writeHead(200, {
1084+
"content-type": "application/json",
1085+
"x-trigger-jwt": "pk_stream_subscribe_error",
1086+
});
1087+
res.end(JSON.stringify({ id: "run_stream_subscribe_error" }));
1088+
return;
1089+
}
1090+
1091+
res.writeHead(404);
1092+
res.end();
1093+
});
1094+
1095+
const transport = new TriggerChatTransport({
1096+
task: "chat-task",
1097+
stream: "chat-stream",
1098+
accessToken: "pk_trigger",
1099+
baseURL: server.url,
1100+
runStore,
1101+
onError: function onError(error) {
1102+
errors.push(error);
1103+
},
1104+
});
1105+
1106+
(transport as any).fetchRunStream = async function fetchRunStream() {
1107+
throw new Error("stream subscribe failed root");
1108+
};
1109+
1110+
await expect(
1111+
transport.sendMessages({
1112+
trigger: "submit-message",
1113+
chatId: "chat-stream-subscribe-error",
1114+
messageId: undefined,
1115+
messages: [],
1116+
abortSignal: undefined,
1117+
})
1118+
).rejects.toThrowError("stream subscribe failed root");
1119+
1120+
expect(errors).toHaveLength(1);
1121+
expect(errors[0]).toMatchObject({
1122+
phase: "streamSubscribe",
1123+
chatId: "chat-stream-subscribe-error",
1124+
runId: "run_stream_subscribe_error",
1125+
});
1126+
expect(runStore.get("chat-stream-subscribe-error")).toBeUndefined();
1127+
});
1128+
1129+
it("keeps original stream subscription failure when onError callback also fails", async function () {
1130+
const runStore = new InMemoryTriggerChatRunStore();
1131+
1132+
const server = await startServer(function (req, res) {
1133+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1134+
res.writeHead(200, {
1135+
"content-type": "application/json",
1136+
"x-trigger-jwt": "pk_stream_subscribe_onerror_failure",
1137+
});
1138+
res.end(JSON.stringify({ id: "run_stream_subscribe_onerror_failure" }));
1139+
return;
1140+
}
1141+
1142+
res.writeHead(404);
1143+
res.end();
1144+
});
1145+
1146+
const transport = new TriggerChatTransport({
1147+
task: "chat-task",
1148+
stream: "chat-stream",
1149+
accessToken: "pk_trigger",
1150+
baseURL: server.url,
1151+
runStore,
1152+
onError: async function onError() {
1153+
throw new Error("onError failed");
1154+
},
1155+
});
1156+
1157+
(transport as any).fetchRunStream = async function fetchRunStream() {
1158+
throw new Error("stream subscribe failed root");
1159+
};
1160+
1161+
await expect(
1162+
transport.sendMessages({
1163+
trigger: "submit-message",
1164+
chatId: "chat-stream-subscribe-onerror-failure",
1165+
messageId: undefined,
1166+
messages: [],
1167+
abortSignal: undefined,
1168+
})
1169+
).rejects.toThrowError("stream subscribe failed root");
1170+
1171+
expect(runStore.get("chat-stream-subscribe-onerror-failure")).toBeUndefined();
1172+
});
1173+
10771174
it("supports creating transport with factory function", async function () {
10781175
let observedRunId: string | undefined;
10791176
let callbackCompleted = false;

packages/ai/src/chatTransport.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,21 @@ export class TriggerChatTransport<
231231
}
232232
}
233233

234-
const stream = await this.fetchRunStream(runState, options.abortSignal);
234+
let stream: ReadableStream<SSEStreamPart<InferUIMessageChunk<UI_MESSAGE>>>;
235+
try {
236+
stream = await this.fetchRunStream(runState, options.abortSignal);
237+
} catch (error) {
238+
runState.isActive = false;
239+
await this.runStore.set(runState);
240+
await this.runStore.delete(runState.chatId);
241+
await this.reportError({
242+
phase: "streamSubscribe",
243+
chatId: runState.chatId,
244+
runId: runState.runId,
245+
error: normalizeError(error),
246+
});
247+
throw error;
248+
}
235249

236250
return this.createTrackedStream(runState.chatId, stream);
237251
}

packages/ai/src/chatTransport.types.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ function createTypedOnErrorCallback(): TriggerChatOnError {
112112
| "payloadMapper"
113113
| "triggerOptions"
114114
| "triggerTask"
115+
| "streamSubscribe"
115116
| "onTriggeredRun"
116117
| "consumeTrackingStream"
117118
| "reconnect"

packages/ai/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export type TriggerChatTransportErrorPhase =
7474
| "payloadMapper"
7575
| "triggerOptions"
7676
| "triggerTask"
77+
| "streamSubscribe"
7778
| "onTriggeredRun"
7879
| "consumeTrackingStream"
7980
| "reconnect";

0 commit comments

Comments
 (0)