-
-
Notifications
You must be signed in to change notification settings - Fork 199
fix(proxy): 加固 AgentPool 清理与透传 stats #762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
88e485e
dbcd4b6
ee08a4a
a172ad8
0152bab
fcc1716
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,7 +10,7 @@ import { SessionManager } from "@/lib/session-manager"; | |||||||||||
| import { SessionTracker } from "@/lib/session-tracker"; | ||||||||||||
| import { calculateRequestCost } from "@/lib/utils/cost-calculation"; | ||||||||||||
| import { hasValidPriceData } from "@/lib/utils/price-data"; | ||||||||||||
| import { parseSSEData } from "@/lib/utils/sse"; | ||||||||||||
| import { isSSEText, parseSSEData } from "@/lib/utils/sse"; | ||||||||||||
| import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; | ||||||||||||
| import { | ||||||||||||
| updateMessageRequestCost, | ||||||||||||
|
|
@@ -961,53 +961,95 @@ export class ProxyResponseHandler { | |||||||||||
|
|
||||||||||||
| let reader: ReadableStreamDefaultReader<Uint8Array> | null = null; | ||||||||||||
| // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) | ||||||||||||
| // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 | ||||||||||||
| // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 | ||||||||||||
| // 说明:用于统计/结算的内容采用“头部 + 尾部窗口”: | ||||||||||||
| // - 头部保留前 MAX_STATS_HEAD_BYTES(便于解析可能前置的 metadata) | ||||||||||||
| // - 尾部保留最近 MAX_STATS_TAIL_BYTES(便于解析结尾 usage/假 200 等) | ||||||||||||
| // - 中间部分会被丢弃(wasTruncated=true),统计将退化为 best-effort | ||||||||||||
| const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB | ||||||||||||
| const MAX_STATS_BUFFER_CHUNKS = 8192; | ||||||||||||
| const chunks: string[] = []; | ||||||||||||
| const chunkBytes: number[] = []; | ||||||||||||
| let chunkHead = 0; | ||||||||||||
| let bufferedBytes = 0; | ||||||||||||
| const MAX_STATS_HEAD_BYTES = 1024 * 1024; // 1MB | ||||||||||||
| const MAX_STATS_TAIL_BYTES = MAX_STATS_BUFFER_BYTES - MAX_STATS_HEAD_BYTES; | ||||||||||||
| const MAX_STATS_TAIL_CHUNKS = 8192; | ||||||||||||
|
|
||||||||||||
| const headChunks: string[] = []; | ||||||||||||
| let headBufferedBytes = 0; | ||||||||||||
|
|
||||||||||||
| const tailChunks: string[] = []; | ||||||||||||
| const tailChunkBytes: number[] = []; | ||||||||||||
| let tailHead = 0; | ||||||||||||
| let tailBufferedBytes = 0; | ||||||||||||
| let wasTruncated = false; | ||||||||||||
| let inTailMode = false; | ||||||||||||
|
|
||||||||||||
| const joinTailChunks = (): string => { | ||||||||||||
| if (tailHead <= 0) return tailChunks.join(""); | ||||||||||||
| return tailChunks.slice(tailHead).join(""); | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| const joinChunks = (): string => { | ||||||||||||
| if (chunkHead <= 0) return chunks.join(""); | ||||||||||||
| return chunks.slice(chunkHead).join(""); | ||||||||||||
| const headText = headChunks.join(""); | ||||||||||||
| if (!inTailMode) { | ||||||||||||
| return headText; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| const tailText = joinTailChunks(); | ||||||||||||
|
|
||||||||||||
| // 用 SSE comment 标记被截断的中间段;parseSSEData 会忽略 ":" 开头的行 | ||||||||||||
| if (wasTruncated) { | ||||||||||||
| // 插入空行强制 flush event,避免“头+尾”拼接后跨 event 误拼接数据行 | ||||||||||||
| return `${headText}\n\n: [cch_truncated]\n\n${tailText}`; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| return `${headText}${tailText}`; | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| const pushChunk = (text: string, bytes: number) => { | ||||||||||||
| if (!text) return; | ||||||||||||
| chunks.push(text); | ||||||||||||
| chunkBytes.push(bytes); | ||||||||||||
| bufferedBytes += bytes; | ||||||||||||
|
|
||||||||||||
| // 仅保留尾部窗口,避免内存无界增长 | ||||||||||||
| while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { | ||||||||||||
| bufferedBytes -= chunkBytes[chunkHead] ?? 0; | ||||||||||||
| chunks[chunkHead] = ""; | ||||||||||||
| chunkBytes[chunkHead] = 0; | ||||||||||||
| chunkHead += 1; | ||||||||||||
| wasTruncated = true; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 | ||||||||||||
| if (chunkHead > 4096) { | ||||||||||||
| chunks.splice(0, chunkHead); | ||||||||||||
| chunkBytes.splice(0, chunkHead); | ||||||||||||
| chunkHead = 0; | ||||||||||||
| } | ||||||||||||
| const pushToTail = () => { | ||||||||||||
| tailChunks.push(text); | ||||||||||||
| tailChunkBytes.push(bytes); | ||||||||||||
| tailBufferedBytes += bytes; | ||||||||||||
|
|
||||||||||||
| // 仅保留尾部窗口,避免内存无界增长 | ||||||||||||
| while (tailBufferedBytes > MAX_STATS_TAIL_BYTES && tailHead < tailChunkBytes.length) { | ||||||||||||
| tailBufferedBytes -= tailChunkBytes[tailHead] ?? 0; | ||||||||||||
| tailChunks[tailHead] = ""; | ||||||||||||
| tailChunkBytes[tailHead] = 0; | ||||||||||||
| tailHead += 1; | ||||||||||||
| wasTruncated = true; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 | ||||||||||||
| if (tailHead > 4096) { | ||||||||||||
| tailChunks.splice(0, tailHead); | ||||||||||||
| tailChunkBytes.splice(0, tailHead); | ||||||||||||
| tailHead = 0; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) | ||||||||||||
| const keptCount = tailChunks.length - tailHead; | ||||||||||||
| if (keptCount > MAX_STATS_TAIL_CHUNKS) { | ||||||||||||
| const joined = joinTailChunks(); | ||||||||||||
| tailChunks.length = 0; | ||||||||||||
| tailChunkBytes.length = 0; | ||||||||||||
| tailHead = 0; | ||||||||||||
| tailChunks.push(joined); | ||||||||||||
| tailChunkBytes.push(tailBufferedBytes); | ||||||||||||
| } | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) | ||||||||||||
| const keptCount = chunks.length - chunkHead; | ||||||||||||
| if (keptCount > MAX_STATS_BUFFER_CHUNKS) { | ||||||||||||
| const joined = joinChunks(); | ||||||||||||
| chunks.length = 0; | ||||||||||||
| chunkBytes.length = 0; | ||||||||||||
| chunkHead = 0; | ||||||||||||
| chunks.push(joined); | ||||||||||||
| chunkBytes.push(bufferedBytes); | ||||||||||||
| // 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断) | ||||||||||||
| if (!inTailMode) { | ||||||||||||
| if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) { | ||||||||||||
| headChunks.push(text); | ||||||||||||
| headBufferedBytes += bytes; | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| inTailMode = true; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| pushToTail(); | ||||||||||||
| }; | ||||||||||||
| const decoder = new TextDecoder(); | ||||||||||||
| let isFirstChunk = true; | ||||||||||||
|
|
@@ -1035,8 +1077,10 @@ export class ProxyResponseHandler { | |||||||||||
| providerId: provider.id, | ||||||||||||
| providerName: provider.name, | ||||||||||||
| idleTimeoutMs, | ||||||||||||
| chunksCollected: Math.max(0, chunks.length - chunkHead), | ||||||||||||
| bufferedBytes, | ||||||||||||
| chunksCollected: headChunks.length + Math.max(0, tailChunks.length - tailHead), | ||||||||||||
| headBufferedBytes, | ||||||||||||
| tailBufferedBytes, | ||||||||||||
| bufferedBytes: headBufferedBytes + tailBufferedBytes, | ||||||||||||
| wasTruncated, | ||||||||||||
| }); | ||||||||||||
| // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 | ||||||||||||
|
|
@@ -1109,7 +1153,25 @@ export class ProxyResponseHandler { | |||||||||||
| session.recordTtfb(); | ||||||||||||
| clearResponseTimeoutOnce(chunkSize); | ||||||||||||
| } | ||||||||||||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||||||||||||
|
|
||||||||||||
| // 尽量填满 head:边界 chunk 可能跨过 head 上限,按 byte 切分以避免 head 少于 1MB | ||||||||||||
| if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) { | ||||||||||||
| const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes; | ||||||||||||
| if (remainingHeadBytes > 0 && chunkSize > remainingHeadBytes) { | ||||||||||||
| const headPart = value.subarray(0, remainingHeadBytes); | ||||||||||||
| const tailPart = value.subarray(remainingHeadBytes); | ||||||||||||
|
|
||||||||||||
| const headText = decoder.decode(headPart, { stream: true }); | ||||||||||||
| pushChunk(headText, remainingHeadBytes); | ||||||||||||
|
Comment on lines
+1164
to
+1165
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When splitting at a UTF-8 multi-byte sequence boundary, Calculate actual decoded byte length:
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1195
Comment:
When splitting at a UTF-8 multi-byte sequence boundary, `decoder.decode(headPart, { stream: true })` may withhold incomplete bytes (e.g., 2 bytes of a 3-byte character). The decoder returns text representing fewer bytes than `remainingHeadBytes`, but `pushChunk` is told the full `remainingHeadBytes` count. This causes `headBufferedBytes` to overcount by 1-3 bytes, triggering premature switch to tail mode and under-filling the head buffer.
Calculate actual decoded byte length:
```suggestion
const headText = decoder.decode(headPart, { stream: true });
const actualHeadBytes = new TextEncoder().encode(headText).byteLength;
pushChunk(headText, actualHeadBytes);
```
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||
|
|
||||||||||||
| const tailText = decoder.decode(tailPart, { stream: true }); | ||||||||||||
| pushChunk(tailText, chunkSize - remainingHeadBytes); | ||||||||||||
|
Comment on lines
+1164
to
+1168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Splitting a chunk and calling Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences. Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1198
Comment:
Splitting a chunk and calling `decode()` twice sequentially on the same decoder maintains UTF-8 multi-byte sequence integrity because `stream: true` preserves incomplete sequences between calls - this is correct. However, `pushChunk(headText, remainingHeadBytes)` on line 1195 passes the original **byte** count (`remainingHeadBytes`), but `headText` may have **fewer characters** if the split occurred mid-UTF-8-sequence (decoder will withhold those bytes until the next call). This means `headBufferedBytes` could become incorrect, potentially under-filling the head window.
Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences.
How can I resolve this? If you propose a fix, please make it concise.
Comment on lines
+1167
to
+1168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same byte accounting issue:
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1197:1198
Comment:
Same byte accounting issue: `chunkSize - remainingHeadBytes` represents the tail part's byte count, but if UTF-8 bytes were withheld from the head decode, those bytes are prepended internally by the decoder when decoding `tailPart`. The decoded `tailText` will contain more characters than `chunkSize - remainingHeadBytes` would suggest, leading to inaccurate byte tracking.
```suggestion
const tailText = decoder.decode(tailPart, { stream: true });
const actualTailBytes = new TextEncoder().encode(tailText).byteLength;
pushChunk(tailText, actualTailBytes);
```
How can I resolve this? If you propose a fix, please make it concise.
Comment on lines
+1164
to
+1168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. UTF-8 byte accounting wrong In the head/tail split path, Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168. Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1164:1168
Comment:
**UTF-8 byte accounting wrong**
In the head/tail split path, `pushChunk(headText, remainingHeadBytes)` and `pushChunk(tailText, chunkSize - remainingHeadBytes)` assume the decoder returns text for exactly those byte counts. If `headPart` ends mid–multi-byte UTF-8 sequence, `TextDecoder.decode(headPart, { stream: true })` will buffer 1–3 bytes internally and *not* emit them until the next decode call. That means `headBufferedBytes`/`tailBufferedBytes` can become inaccurate, and you can either (a) switch to tail mode too early (under-filling head) or (b) exceed the intended limits without setting `wasTruncated` correctly. This is observable whenever a chunk boundary/split hits a multi-byte sequence.
Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168.
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||
| } else { | ||||||||||||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||||||||||||
| } | ||||||||||||
| } else { | ||||||||||||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||||||||||||
|
|
@@ -2215,7 +2277,7 @@ export function parseUsageFromResponseText( | |||||||||||
| // SSE 解析:支持两种格式 | ||||||||||||
| // 1. 标准 SSE (event: + data:) - Claude/OpenAI | ||||||||||||
| // 2. 纯 data: 格式 - Gemini | ||||||||||||
| if (!usageMetrics && responseText.includes("data:")) { | ||||||||||||
| if (!usageMetrics && isSSEText(responseText)) { | ||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replaced naive Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 2311:2311
Comment:
Replaced naive `responseText.includes("data:")` with `isSSEText()` utility for more accurate SSE format detection. This prevents false positives from JSON bodies containing the string "data:" and correctly identifies SSE by checking line-start patterns.
How can I resolve this? If you propose a fix, please make it concise. |
||||||||||||
| const events = parseSSEData(responseText); | ||||||||||||
|
|
||||||||||||
| // Claude SSE 特殊处理: | ||||||||||||
|
|
||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,6 +165,15 @@ export class AgentPoolImpl implements AgentPool { | |
| }; | ||
| /** Pending agent creation promises to prevent race conditions */ | ||
| private pendingCreations: Map<string, Promise<GetAgentResult>> = new Map(); | ||
| /** | ||
| * Pending destroy/close promises (best-effort). | ||
| * | ||
| * 说明: | ||
| * - 驱逐/清理路径为了避免全局卡死,必须 fire-and-forget(不 await)。 | ||
| * - 但在 shutdown() 中我们仍希望尽量“优雅收尾”,因此在这里追踪 pending 的关闭任务。 | ||
| * - 若某些 dispatcher 永不 settle,这里会在超时后丢弃引用,避免内存泄漏。 | ||
| */ | ||
| private pendingCleanups: Set<Promise<void>> = new Set(); | ||
|
|
||
| constructor(config: Partial<AgentPoolConfig> = {}) { | ||
| this.config = { ...DEFAULT_CONFIG, ...config }; | ||
|
|
@@ -329,13 +338,31 @@ export class AgentPoolImpl implements AgentPool { | |
| this.cleanupTimer = null; | ||
| } | ||
|
|
||
| const closePromises: Promise<void>[] = []; | ||
| // closeAgent 本身是 fire-and-forget(不 await destroy/close),这里并行触发即可。 | ||
| await Promise.allSettled( | ||
| Array.from(this.cache.entries()).map(([key, cached]) => this.closeAgent(cached.agent, key)) | ||
| ); | ||
|
|
||
| for (const [key, cached] of this.cache.entries()) { | ||
| closePromises.push(this.closeAgent(cached.agent, key)); | ||
| // Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙) | ||
| if (this.pendingCleanups.size > 0) { | ||
| const pending = Array.from(this.pendingCleanups); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timing window: if a cleanup promise settles between this snapshot (line 348) and when Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 348:348
Comment:
Timing window: if a cleanup promise settles between this snapshot (line 348) and when `Promise.race` starts (line 352), its `.finally()` handler (line 420) will try to delete from `pendingCleanups` even though it's already in the `pending` array. When the race completes and line 363 clears the set, that promise's delete becomes a no-op on an empty set. Benign but worth noting.
How can I resolve this? If you propose a fix, please make it concise. |
||
| const WAIT_MS = 2000; | ||
| let timeoutId: NodeJS.Timeout | null = null; | ||
| try { | ||
| await Promise.race([ | ||
| Promise.allSettled(pending).then(() => {}), | ||
| new Promise<void>((resolve) => { | ||
| timeoutId = setTimeout(resolve, WAIT_MS); | ||
| timeoutId.unref(); | ||
| }), | ||
| ]); | ||
|
Comment on lines
+352
to
+358
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Race condition: if a cleanup promise settles and removes itself from Low severity - functionally safe, but the timing window could be tightened by checking Prompt To Fix With AIThis is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 352:358
Comment:
Race condition: if a cleanup promise settles and removes itself from `pendingCleanups` (line 421) **after** line 348 creates the array snapshot but **before** the `Promise.race` starts, that promise is still included in `pending` but no longer tracked. When it later settles during the race, line 421 tries to delete from an already-cleared set (line 363 clears after race completes), causing a harmless but unintended delete operation on an empty set.
Low severity - functionally safe, but the timing window could be tightened by checking `this.pendingCleanups.size` again after the race or accepting this benign race.
How can I resolve this? If you propose a fix, please make it concise. |
||
| } finally { | ||
| if (timeoutId) clearTimeout(timeoutId); | ||
| } | ||
|
|
||
| this.pendingCleanups.clear(); | ||
| } | ||
|
|
||
| await Promise.all(closePromises); | ||
| this.cache.clear(); | ||
| this.unhealthyKeys.clear(); | ||
|
|
||
|
|
@@ -372,23 +399,34 @@ export class AgentPoolImpl implements AgentPool { | |
|
|
||
| // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”) | ||
| // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。 | ||
| if (operation === "destroy") { | ||
| agent.destroy().catch((error) => { | ||
| logger.warn("AgentPool: Error closing agent", { | ||
| key, | ||
| operation, | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }); | ||
| }); | ||
| } else if (operation === "close") { | ||
| agent.close().catch((error) => { | ||
| // 同时将 promise 纳入 pendingCleanups,便于 shutdown() 做 best-effort 的“优雅收尾”。 | ||
| const cleanupPromise = | ||
| operation === "destroy" ? agent.destroy() : operation === "close" ? agent.close() : null; | ||
|
|
||
| if (!cleanupPromise) return; | ||
|
|
||
| let dropRefTimeoutId: NodeJS.Timeout | null = null; | ||
|
|
||
| const trackedPromise: Promise<void> = cleanupPromise | ||
| .catch((error) => { | ||
| logger.warn("AgentPool: Error closing agent", { | ||
| key, | ||
| operation, | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }); | ||
| }) | ||
| .finally(() => { | ||
| if (dropRefTimeoutId) clearTimeout(dropRefTimeoutId); | ||
| this.pendingCleanups.delete(trackedPromise); | ||
| }); | ||
| } | ||
|
|
||
| this.pendingCleanups.add(trackedPromise); | ||
|
|
||
| // 避免某些 dispatcher 永不 settle 导致 pendingCleanups 长期持有引用 | ||
| dropRefTimeoutId = setTimeout(() => { | ||
| this.pendingCleanups.delete(trackedPromise); | ||
| }, 60000); | ||
| dropRefTimeoutId.unref(); | ||
| } catch (error) { | ||
| logger.warn("AgentPool: Error closing agent", { | ||
| key, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Head window drops bytes
In passthrough stats buffering, when
headBufferedBytes + byteswould exceedMAX_STATS_HEAD_BYTES, the code switches to tail mode and drops the current chunk entirely from the head, even though part of it could still fit. This creates an unintended gap between head and tail (more truncation than necessary) and can break parsing if important metadata starts near the end of the head window. Consider splitting the chunk (take remaining head capacity, then send the rest to tail) so the head window is truly “first N bytes.”Prompt To Fix With AI