Skip to content

Commit f7fb35a

Browse files
committed
fix(tests): fix flaky getSnapshotsSince test
1 parent 68e3f8c commit f7fb35a

File tree

2 files changed

+268
-1
lines changed

2 files changed

+268
-1
lines changed
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# Graceful handling of oversized batch items
2+
3+
## Prerequisites
4+
5+
This plan builds on top of PR #2980 which provides:
6+
- `TriggerFailedTaskService` at `apps/webapp/app/runEngine/services/triggerFailedTask.server.ts` - creates pre-failed TaskRuns with proper trace events, waitpoint connections, and parent run associations
7+
- `engine.createFailedTaskRun()` on RunEngine - creates a SYSTEM_FAILURE run with associated waitpoints
8+
- Retry support in `processItemCallback` with `attempt` and `isFinalAttempt` params
9+
- The callback already uses `TriggerFailedTaskService` for items that fail after retries
10+
11+
## Problem
12+
13+
When the NDJSON parser in `createNdjsonParserStream` detects an oversized line, it throws inside the TransformStream's `transform()` method. This aborts the request body stream (due to `pipeThrough` coupling), causing the client's `fetch()` to see `TypeError: fetch failed` instead of the server's 400 response. The SDK treats this as a connection error and retries with exponential backoff (~25s wasted).
14+
15+
## Goal
16+
17+
Instead of throwing, treat oversized items as per-item failures that flow through the existing batch failure pipeline. The batch seals normally, other items process fine, and the user sees a clear failure for the specific oversized item(s).
18+
19+
## Approach
20+
21+
The NDJSON parser emits an error marker object instead of throwing. `StreamBatchItemsService` detects these markers and enqueues the item to the FairQueue with error metadata in its options. The `processItemCallback` (already enhanced with `TriggerFailedTaskService` in PR #2980) detects the error metadata and creates a pre-failed run via `TriggerFailedTaskService`, which handles all the waitpoint/trace machinery.
22+
23+
## Changes
24+
25+
### 1. Byte-level key extractor for oversized lines
26+
27+
**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`** - new function
28+
29+
Add `extractIndexAndTask(bytes: Uint8Array): { index: number; task: string }` - a state machine that extracts top-level `"index"` and `"task"` values from raw bytes without decoding the full line.
30+
31+
How it works:
32+
- Scan bytes tracking JSON nesting depth (count `{`/`[` vs `}`/`]`)
33+
- At depth 1 (inside the top-level object), look for byte sequences matching `"index"` and `"task"` key patterns
34+
- For `"index"`: after the `:`, parse the digit sequence as a number
35+
- For `"task"`: after the `:`, find opening `"`, read bytes until closing `"`, decode just that slice
36+
- Stop when both found, or after scanning 512 bytes (whichever comes first)
37+
- Fallback: `index = -1`, `task = "unknown"` if not found
38+
39+
This avoids decoding/allocating the full 3MB line - only the first few hundred bytes are examined.
40+
41+
### 2. Modify `createNdjsonParserStream` to emit error markers
42+
43+
**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`**
44+
45+
Define a marker type:
46+
```typescript
47+
type OversizedItemMarker = {
48+
__batchItemError: "OVERSIZED";
49+
index: number;
50+
task: string;
51+
actualSize: number;
52+
maxSize: number;
53+
};
54+
```
55+
56+
**Case 1 - Complete line exceeds limit** (newline found, `newlineIndex > maxItemBytes`):
57+
- Call `extractLine(newlineIndex)` to consume the line from the buffer
58+
- Call `extractIndexAndTask(lineBytes)` on the extracted bytes
59+
- `controller.enqueue(marker)` instead of throwing
60+
- Increment `lineNumber` and continue
61+
62+
**Case 2 - Incomplete line exceeds limit** (no newline, `totalBytes > maxItemBytes`):
63+
- Call `extractIndexAndTask(concatenateChunks())` on current buffer
64+
- `controller.enqueue(marker)`
65+
- Clear the buffer (`chunks = []; totalBytes = 0`)
66+
- Return from transform (don't throw)
67+
68+
**Case 3 - Flush with oversized remaining** (`totalBytes > maxItemBytes` in flush):
69+
- Same as case 2 but in `flush()`.
70+
71+
### 3. Handle markers in `StreamBatchItemsService`
72+
73+
**`apps/webapp/app/runEngine/services/streamBatchItems.server.ts`** - in the `for await` loop
74+
75+
Before the existing `BatchItemNDJSONSchema.safeParse(rawItem)`, check for the marker:
76+
77+
```typescript
78+
if (rawItem && typeof rawItem === "object" && (rawItem as any).__batchItemError === "OVERSIZED") {
79+
const marker = rawItem as OversizedItemMarker;
80+
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;
81+
82+
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;
83+
84+
// Enqueue the item normally but with error metadata in options.
85+
// The processItemCallback will detect __error and use TriggerFailedTaskService
86+
// to create a pre-failed run with proper waitpoint connections.
87+
const batchItem: BatchItem = {
88+
task: marker.task,
89+
payload: "{}",
90+
payloadType: "application/json",
91+
options: {
92+
__error: errorMessage,
93+
__errorCode: "PAYLOAD_TOO_LARGE",
94+
},
95+
};
96+
97+
const result = await this._engine.enqueueBatchItem(
98+
batchId, environment.id, itemIndex, batchItem
99+
);
100+
101+
if (result.enqueued) {
102+
itemsAccepted++;
103+
} else {
104+
itemsDeduplicated++;
105+
}
106+
lastIndex = itemIndex;
107+
continue;
108+
}
109+
```
110+
111+
### 4. Handle `__error` items in `processItemCallback`
112+
113+
**`apps/webapp/app/v3/runEngineHandlers.server.ts`** - in the `setupBatchQueueCallbacks` function
114+
115+
In the `processItemCallback`, before the `TriggerTaskService.call()`, check for `__error` in `item.options`:
116+
117+
```typescript
118+
const itemError = item.options?.__error as string | undefined;
119+
if (itemError) {
120+
const errorCode = (item.options?.__errorCode as string) ?? "ITEM_ERROR";
121+
122+
// Use TriggerFailedTaskService to create a pre-failed run.
123+
// This creates a proper TaskRun with waitpoint connections so the
124+
// parent's batchTriggerAndWait resolves correctly for this item.
125+
const failedRunId = await triggerFailedTaskService.call({
126+
taskId: item.task,
127+
environment,
128+
payload: item.payload ?? "{}",
129+
payloadType: item.payloadType,
130+
errorMessage: itemError,
131+
errorCode: errorCode as TaskRunErrorCodes,
132+
parentRunId: meta.parentRunId,
133+
resumeParentOnCompletion: meta.resumeParentOnCompletion,
134+
batch: { id: batchId, index: itemIndex },
135+
traceContext: meta.traceContext as Record<string, unknown> | undefined,
136+
spanParentAsLink: meta.spanParentAsLink,
137+
});
138+
139+
if (failedRunId) {
140+
span.setAttribute("batch.result.pre_failed", true);
141+
span.setAttribute("batch.result.run_id", failedRunId);
142+
span.end();
143+
return { success: true as const, runId: failedRunId };
144+
}
145+
146+
// Fallback if TriggerFailedTaskService fails
147+
span.end();
148+
return { success: false as const, error: itemError, errorCode };
149+
}
150+
```
151+
152+
Note: this returns `{ success: true, runId }` because the pre-failed run IS a real run. The BatchQueue records it as a success (run was created). The run itself is already in SYSTEM_FAILURE status, so the batch completion flow handles it correctly.
153+
154+
If `environment` is null (environment not found), fall through to the existing environment-not-found handling which already uses `triggerFailedTaskService.callWithoutTraceEvents()` on `isFinalAttempt`.
155+
156+
### 5. Handle undefined/null payload in BatchQueue serialization
157+
158+
**`internal-packages/run-engine/src/batch-queue/index.ts`** - in `#handleMessage`
159+
160+
Both payload serialization blocks (in the `success: false` branch and the `catch` block) do:
161+
```typescript
162+
const str = typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
163+
innerSpan?.setAttribute("batch.payloadSize", str.length);
164+
```
165+
166+
`JSON.stringify(undefined)` returns `undefined`, causing `str.length` to crash. Fix both:
167+
```typescript
168+
const str =
169+
item.payload === undefined || item.payload === null
170+
? "{}"
171+
: typeof item.payload === "string"
172+
? item.payload
173+
: JSON.stringify(item.payload);
174+
```
175+
176+
### 6. Remove stale error handling in route
177+
178+
**`apps/webapp/app/routes/api.v3.batches.$batchId.items.ts`**
179+
180+
The `error.message.includes("exceeds maximum size")` branch is no longer reachable since oversized items don't throw. Remove that condition, keep the `"Invalid JSON"` check.
181+
182+
### 7. Remove `BatchItemTooLargeError` and SDK pre-validation
183+
184+
**`packages/core/src/v3/apiClient/errors.ts`** - remove `BatchItemTooLargeError` class
185+
186+
**`packages/core/src/v3/apiClient/index.ts`**:
187+
- Remove `BatchItemTooLargeError` import
188+
- Remove `instanceof BatchItemTooLargeError` check in the retry catch block
189+
- Remove `MAX_BATCH_ITEM_BYTES` constant
190+
- Remove size validation from `createNdjsonStream` (revert `encodeAndValidate` to simple encode)
191+
192+
**`packages/trigger-sdk/src/v3/shared.ts`** - remove `BatchItemTooLargeError` import and handling in `buildBatchErrorMessage`
193+
194+
**`packages/trigger-sdk/src/v3/index.ts`** - remove `BatchItemTooLargeError` re-export
195+
196+
### 8. Update tests
197+
198+
**`apps/webapp/test/engine/streamBatchItems.test.ts`**:
199+
- Update "should reject lines exceeding maxItemBytes" to assert `OversizedItemMarker` emission instead of throw
200+
- Update "should reject unbounded accumulation without newlines" similarly
201+
- Update the emoji byte-size test to assert marker instead of throw
202+
203+
### 9. Update reference project test task
204+
205+
**`references/hello-world/src/trigger/batches.ts`**:
206+
- Remove `BatchItemTooLargeError` import
207+
- Update `batchSealFailureOversizedPayload` task to test the new behavior:
208+
- Send 2 items: one normal, one oversized (~3.2MB)
209+
- Assert `batchTriggerAndWait` returns (doesn't throw)
210+
- Assert `results.runs[0].ok === true` (normal item succeeded)
211+
- Assert `results.runs[1].ok === false` (oversized item failed)
212+
- Assert error message contains "too large"
213+
214+
## Data flow
215+
216+
```
217+
NDJSON bytes arrive
218+
|
219+
createNdjsonParserStream
220+
|-- Line <= limit --> parse JSON --> enqueue object
221+
`-- Line > limit --> extractIndexAndTask(bytes) --> enqueue OversizedItemMarker
222+
|
223+
StreamBatchItemsService for-await loop
224+
|-- OversizedItemMarker --> engine.enqueueBatchItem() with __error in options
225+
`-- Normal item --> validate --> engine.enqueueBatchItem()
226+
|
227+
FairQueue consumer (#handleMessage)
228+
|-- __error in options --> processItemCallback detects it
229+
| --> TriggerFailedTaskService.call()
230+
| --> Creates pre-failed TaskRun with SYSTEM_FAILURE status
231+
| --> Proper waitpoint + TaskRunWaitpoint connections created
232+
| --> Returns { success: true, runId: failedRunFriendlyId }
233+
`-- Normal item --> TriggerTaskService.call() --> creates normal run
234+
|
235+
Batch sealing: enqueuedCount === runCount (all items go through enqueueBatchItem)
236+
Batch completion: all items have runs (real or pre-failed), waitpoints resolve normally
237+
Parent run: batchTriggerAndWait resolves with per-item results
238+
```
239+
240+
## Why this works
241+
242+
The key insight is that `TriggerFailedTaskService` (from PR #2980) creates a real `TaskRun` in `SYSTEM_FAILURE` status. This means:
243+
1. A RUN waitpoint is created and connected to the parent via `TaskRunWaitpoint` with correct `batchId`/`batchIndex`
244+
2. The run is immediately completed, which completes the waitpoint
245+
3. The SDK's `waitForBatch` resolver for that index fires with the error result
246+
4. The batch completion flow counts this as a processed item (it's a real run)
247+
5. No special-casing needed in the batch completion callback
248+
249+
## Verification
250+
251+
1. Rebuild `@trigger.dev/core`, `@trigger.dev/sdk`, `@internal/run-engine`
252+
2. Restart webapp + trigger dev
253+
3. Trigger `batch-seal-failure-oversized` task - should complete in ~2-3s with:
254+
- Normal item: `ok: true`
255+
- Oversized item: `ok: false` with "too large" error
256+
4. Run NDJSON parser tests: updated tests assert marker emission instead of throws
257+
5. Run `pnpm run build --filter @internal/run-engine --filter @trigger.dev/core --filter @trigger.dev/sdk`

internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,17 @@ describe("RunEngine getSnapshotsSince", () => {
191191
organizationId: authenticatedEnvironment.organization.id,
192192
});
193193

194-
// Wait for waitpoint completion
194+
// Poll until the waitpoint is completed by the background worker
195+
for (let i = 0; i < 50; i++) {
196+
await setTimeout(100);
197+
const wp = await prisma.waitpoint.findFirst({
198+
where: { id: waitpoint.id },
199+
select: { status: true },
200+
});
201+
if (wp?.status === "COMPLETED") break;
202+
}
203+
204+
// Allow time for the snapshot to be created after waitpoint completion
195205
await setTimeout(200);
196206

197207
// Get all snapshots

0 commit comments

Comments
 (0)