Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 103 additions & 41 deletions src/app/v1/_lib/proxy/response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { SessionManager } from "@/lib/session-manager";
import { SessionTracker } from "@/lib/session-tracker";
import { calculateRequestCost } from "@/lib/utils/cost-calculation";
import { hasValidPriceData } from "@/lib/utils/price-data";
import { parseSSEData } from "@/lib/utils/sse";
import { isSSEText, parseSSEData } from "@/lib/utils/sse";
import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection";
import {
updateMessageRequestCost,
Expand Down Expand Up @@ -961,53 +961,95 @@ export class ProxyResponseHandler {

let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
// 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险)
// 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。
// 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。
// 说明:用于统计/结算的内容采用“头部 + 尾部窗口”:
// - 头部保留前 MAX_STATS_HEAD_BYTES(便于解析可能前置的 metadata)
// - 尾部保留最近 MAX_STATS_TAIL_BYTES(便于解析结尾 usage/假 200 等)
// - 中间部分会被丢弃(wasTruncated=true),统计将退化为 best-effort
const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB
const MAX_STATS_BUFFER_CHUNKS = 8192;
const chunks: string[] = [];
const chunkBytes: number[] = [];
let chunkHead = 0;
let bufferedBytes = 0;
const MAX_STATS_HEAD_BYTES = 1024 * 1024; // 1MB
const MAX_STATS_TAIL_BYTES = MAX_STATS_BUFFER_BYTES - MAX_STATS_HEAD_BYTES;
const MAX_STATS_TAIL_CHUNKS = 8192;

const headChunks: string[] = [];
let headBufferedBytes = 0;

const tailChunks: string[] = [];
const tailChunkBytes: number[] = [];
let tailHead = 0;
let tailBufferedBytes = 0;
let wasTruncated = false;
let inTailMode = false;

const joinTailChunks = (): string => {
if (tailHead <= 0) return tailChunks.join("");
return tailChunks.slice(tailHead).join("");
};

const joinChunks = (): string => {
if (chunkHead <= 0) return chunks.join("");
return chunks.slice(chunkHead).join("");
const headText = headChunks.join("");
if (!inTailMode) {
return headText;
}

const tailText = joinTailChunks();

// 用 SSE comment 标记被截断的中间段;parseSSEData 会忽略 ":" 开头的行
if (wasTruncated) {
// 插入空行强制 flush event,避免“头+尾”拼接后跨 event 误拼接数据行
return `${headText}\n\n: [cch_truncated]\n\n${tailText}`;
}

return `${headText}${tailText}`;
};

const pushChunk = (text: string, bytes: number) => {
if (!text) return;
chunks.push(text);
chunkBytes.push(bytes);
bufferedBytes += bytes;

// 仅保留尾部窗口,避免内存无界增长
while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) {
bufferedBytes -= chunkBytes[chunkHead] ?? 0;
chunks[chunkHead] = "";
chunkBytes[chunkHead] = 0;
chunkHead += 1;
wasTruncated = true;
}

// 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化
if (chunkHead > 4096) {
chunks.splice(0, chunkHead);
chunkBytes.splice(0, chunkHead);
chunkHead = 0;
}
const pushToTail = () => {
tailChunks.push(text);
tailChunkBytes.push(bytes);
tailBufferedBytes += bytes;

// 仅保留尾部窗口,避免内存无界增长
while (tailBufferedBytes > MAX_STATS_TAIL_BYTES && tailHead < tailChunkBytes.length) {
tailBufferedBytes -= tailChunkBytes[tailHead] ?? 0;
tailChunks[tailHead] = "";
tailChunkBytes[tailHead] = 0;
tailHead += 1;
wasTruncated = true;
}

// 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化
if (tailHead > 4096) {
tailChunks.splice(0, tailHead);
tailChunkBytes.splice(0, tailHead);
tailHead = 0;
}

// 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限)
const keptCount = tailChunks.length - tailHead;
if (keptCount > MAX_STATS_TAIL_CHUNKS) {
const joined = joinTailChunks();
tailChunks.length = 0;
tailChunkBytes.length = 0;
tailHead = 0;
tailChunks.push(joined);
tailChunkBytes.push(tailBufferedBytes);
}
};

// 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限
const keptCount = chunks.length - chunkHead;
if (keptCount > MAX_STATS_BUFFER_CHUNKS) {
const joined = joinChunks();
chunks.length = 0;
chunkBytes.length = 0;
chunkHead = 0;
chunks.push(joined);
chunkBytes.push(bufferedBytes);
// 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断
if (!inTailMode) {
if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) {
headChunks.push(text);
headBufferedBytes += bytes;
return;
}

inTailMode = true;
}
Comment on lines +1041 to 1050
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Head window drops bytes

In passthrough stats buffering, when headBufferedBytes + bytes would exceed MAX_STATS_HEAD_BYTES, the code switches to tail mode and drops the current chunk entirely from the head, even though part of it could still fit. This creates an unintended gap between head and tail (more truncation than necessary) and can break parsing if important metadata starts near the end of the head window. Consider splitting the chunk (take remaining head capacity, then send the rest to tail) so the head window is truly “first N bytes.”

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1070:1079

Comment:
**Head window drops bytes**

In passthrough stats buffering, when `headBufferedBytes + bytes` would exceed `MAX_STATS_HEAD_BYTES`, the code switches to tail mode and **drops the current chunk entirely from the head**, even though part of it could still fit. This creates an unintended gap between head and tail (more truncation than necessary) and can break parsing if important metadata starts near the end of the head window. Consider splitting the chunk (take remaining head capacity, then send the rest to tail) so the head window is truly “first N bytes.”

How can I resolve this? If you propose a fix, please make it concise.


pushToTail();
};
const decoder = new TextDecoder();
let isFirstChunk = true;
Expand Down Expand Up @@ -1035,8 +1077,10 @@ export class ProxyResponseHandler {
providerId: provider.id,
providerName: provider.name,
idleTimeoutMs,
chunksCollected: Math.max(0, chunks.length - chunkHead),
bufferedBytes,
chunksCollected: headChunks.length + Math.max(0, tailChunks.length - tailHead),
headBufferedBytes,
tailBufferedBytes,
bufferedBytes: headBufferedBytes + tailBufferedBytes,
wasTruncated,
});
// 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源
Expand Down Expand Up @@ -1109,7 +1153,25 @@ export class ProxyResponseHandler {
session.recordTtfb();
clearResponseTimeoutOnce(chunkSize);
}
pushChunk(decoder.decode(value, { stream: true }), chunkSize);

// 尽量填满 head:边界 chunk 可能跨过 head 上限,按 byte 切分以避免 head 少于 1MB
if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) {
const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes;
if (remainingHeadBytes > 0 && chunkSize > remainingHeadBytes) {
const headPart = value.subarray(0, remainingHeadBytes);
const tailPart = value.subarray(remainingHeadBytes);

const headText = decoder.decode(headPart, { stream: true });
pushChunk(headText, remainingHeadBytes);
Comment on lines +1164 to +1165
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When splitting at a UTF-8 multi-byte sequence boundary, decoder.decode(headPart, { stream: true }) may withhold incomplete bytes (e.g., 2 bytes of a 3-byte character). The decoder returns text representing fewer bytes than remainingHeadBytes, but pushChunk is told the full remainingHeadBytes count. This causes headBufferedBytes to overcount by 1-3 bytes, triggering premature switch to tail mode and under-filling the head buffer.

Calculate actual decoded byte length:

Suggested change
const headText = decoder.decode(headPart, { stream: true });
pushChunk(headText, remainingHeadBytes);
const headText = decoder.decode(headPart, { stream: true });
const actualHeadBytes = new TextEncoder().encode(headText).byteLength;
pushChunk(headText, actualHeadBytes);
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1195

Comment:
When splitting at a UTF-8 multi-byte sequence boundary, `decoder.decode(headPart, { stream: true })` may withhold incomplete bytes (e.g., 2 bytes of a 3-byte character). The decoder returns text representing fewer bytes than `remainingHeadBytes`, but `pushChunk` is told the full `remainingHeadBytes` count. This causes `headBufferedBytes` to overcount by 1-3 bytes, triggering premature switch to tail mode and under-filling the head buffer.

Calculate actual decoded byte length:
```suggestion
                    const headText = decoder.decode(headPart, { stream: true });
                    const actualHeadBytes = new TextEncoder().encode(headText).byteLength;
                    pushChunk(headText, actualHeadBytes);
```

How can I resolve this? If you propose a fix, please make it concise.


const tailText = decoder.decode(tailPart, { stream: true });
pushChunk(tailText, chunkSize - remainingHeadBytes);
Comment on lines +1164 to +1168
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting a chunk and calling decode() twice sequentially on the same decoder maintains UTF-8 multi-byte sequence integrity because stream: true preserves incomplete sequences between calls - this is correct. However, pushChunk(headText, remainingHeadBytes) on line 1195 passes the original byte count (remainingHeadBytes), but headText may have fewer characters if the split occurred mid-UTF-8-sequence (decoder will withhold those bytes until the next call). This means headBufferedBytes could become incorrect, potentially under-filling the head window.

Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1194:1198

Comment:
Splitting a chunk and calling `decode()` twice sequentially on the same decoder maintains UTF-8 multi-byte sequence integrity because `stream: true` preserves incomplete sequences between calls - this is correct. However, `pushChunk(headText, remainingHeadBytes)` on line 1195 passes the original **byte** count (`remainingHeadBytes`), but `headText` may have **fewer characters** if the split occurred mid-UTF-8-sequence (decoder will withhold those bytes until the next call). This means `headBufferedBytes` could become incorrect, potentially under-filling the head window.

Consider tracking the actual decoded byte length or adjusting the accounting to handle incomplete sequences.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +1167 to +1168
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same byte accounting issue: chunkSize - remainingHeadBytes represents the tail part's byte count, but if UTF-8 bytes were withheld from the head decode, those bytes are prepended internally by the decoder when decoding tailPart. The decoded tailText will contain more characters than chunkSize - remainingHeadBytes would suggest, leading to inaccurate byte tracking.

Suggested change
const tailText = decoder.decode(tailPart, { stream: true });
pushChunk(tailText, chunkSize - remainingHeadBytes);
const tailText = decoder.decode(tailPart, { stream: true });
const actualTailBytes = new TextEncoder().encode(tailText).byteLength;
pushChunk(tailText, actualTailBytes);
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1197:1198

Comment:
Same byte accounting issue: `chunkSize - remainingHeadBytes` represents the tail part's byte count, but if UTF-8 bytes were withheld from the head decode, those bytes are prepended internally by the decoder when decoding `tailPart`. The decoded `tailText` will contain more characters than `chunkSize - remainingHeadBytes` would suggest, leading to inaccurate byte tracking.

```suggestion
                    const tailText = decoder.decode(tailPart, { stream: true });
                    const actualTailBytes = new TextEncoder().encode(tailText).byteLength;
                    pushChunk(tailText, actualTailBytes);
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +1164 to +1168
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UTF-8 byte accounting wrong

In the head/tail split path, pushChunk(headText, remainingHeadBytes) and pushChunk(tailText, chunkSize - remainingHeadBytes) assume the decoder returns text for exactly those byte counts. If headPart ends mid–multi-byte UTF-8 sequence, TextDecoder.decode(headPart, { stream: true }) will buffer 1–3 bytes internally and not emit them until the next decode call. That means headBufferedBytes/tailBufferedBytes can become inaccurate, and you can either (a) switch to tail mode too early (under-filling head) or (b) exceed the intended limits without setting wasTruncated correctly. This is observable whenever a chunk boundary/split hits a multi-byte sequence.

Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1164:1168

Comment:
**UTF-8 byte accounting wrong**

In the head/tail split path, `pushChunk(headText, remainingHeadBytes)` and `pushChunk(tailText, chunkSize - remainingHeadBytes)` assume the decoder returns text for exactly those byte counts. If `headPart` ends mid–multi-byte UTF-8 sequence, `TextDecoder.decode(headPart, { stream: true })` will buffer 1–3 bytes internally and *not* emit them until the next decode call. That means `headBufferedBytes`/`tailBufferedBytes` can become inaccurate, and you can either (a) switch to tail mode too early (under-filling head) or (b) exceed the intended limits without setting `wasTruncated` correctly. This is observable whenever a chunk boundary/split hits a multi-byte sequence.

Also appears at src/app/v1/_lib/proxy/response-handler.ts:1165 and :1168.

How can I resolve this? If you propose a fix, please make it concise.

} else {
pushChunk(decoder.decode(value, { stream: true }), chunkSize);
}
} else {
pushChunk(decoder.decode(value, { stream: true }), chunkSize);
}
}

// 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠)
Expand Down Expand Up @@ -2215,7 +2277,7 @@ export function parseUsageFromResponseText(
// SSE 解析:支持两种格式
// 1. 标准 SSE (event: + data:) - Claude/OpenAI
// 2. 纯 data: 格式 - Gemini
if (!usageMetrics && responseText.includes("data:")) {
if (!usageMetrics && isSSEText(responseText)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced naive responseText.includes("data:") with isSSEText() utility for more accurate SSE format detection. This prevents false positives from JSON bodies containing the string "data:" and correctly identifies SSE by checking line-start patterns.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 2311:2311

Comment:
Replaced naive `responseText.includes("data:")` with `isSSEText()` utility for more accurate SSE format detection. This prevents false positives from JSON bodies containing the string "data:" and correctly identifies SSE by checking line-start patterns.

How can I resolve this? If you propose a fix, please make it concise.

const events = parseSSEData(responseText);

// Claude SSE 特殊处理:
Expand Down
68 changes: 53 additions & 15 deletions src/lib/proxy-agent/agent-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ export class AgentPoolImpl implements AgentPool {
};
/** Pending agent creation promises to prevent race conditions */
private pendingCreations: Map<string, Promise<GetAgentResult>> = new Map();
/**
* Pending destroy/close promises (best-effort).
*
* 说明:
* - 驱逐/清理路径为了避免全局卡死,必须 fire-and-forget(不 await)。
* - 但在 shutdown() 中我们仍希望尽量“优雅收尾”,因此在这里追踪 pending 的关闭任务。
* - 若某些 dispatcher 永不 settle,这里会在超时后丢弃引用,避免内存泄漏。
*/
private pendingCleanups: Set<Promise<void>> = new Set();

constructor(config: Partial<AgentPoolConfig> = {}) {
this.config = { ...DEFAULT_CONFIG, ...config };
Expand Down Expand Up @@ -329,13 +338,31 @@ export class AgentPoolImpl implements AgentPool {
this.cleanupTimer = null;
}

const closePromises: Promise<void>[] = [];
// closeAgent 本身是 fire-and-forget(不 await destroy/close),这里并行触发即可。
await Promise.allSettled(
Array.from(this.cache.entries()).map(([key, cached]) => this.closeAgent(cached.agent, key))
);

for (const [key, cached] of this.cache.entries()) {
closePromises.push(this.closeAgent(cached.agent, key));
// Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙)
if (this.pendingCleanups.size > 0) {
const pending = Array.from(this.pendingCleanups);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timing window: if a cleanup promise settles between this snapshot (line 348) and when Promise.race starts (line 352), its .finally() handler (line 420) will try to delete from pendingCleanups even though it's already in the pending array. When the race completes and line 363 clears the set, that promise's delete becomes a no-op on an empty set. Benign but worth noting.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 348:348

Comment:
Timing window: if a cleanup promise settles between this snapshot (line 348) and when `Promise.race` starts (line 352), its `.finally()` handler (line 420) will try to delete from `pendingCleanups` even though it's already in the `pending` array. When the race completes and line 363 clears the set, that promise's delete becomes a no-op on an empty set. Benign but worth noting.

How can I resolve this? If you propose a fix, please make it concise.

const WAIT_MS = 2000;
let timeoutId: NodeJS.Timeout | null = null;
try {
await Promise.race([
Promise.allSettled(pending).then(() => {}),
new Promise<void>((resolve) => {
timeoutId = setTimeout(resolve, WAIT_MS);
timeoutId.unref();
}),
]);
Comment on lines +352 to +358
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: if a cleanup promise settles and removes itself from pendingCleanups (line 421) after line 348 creates the array snapshot but before the Promise.race starts, that promise is still included in pending but no longer tracked. When it later settles during the race, line 421 tries to delete from an already-cleared set (line 363 clears after race completes), causing a harmless but unintended delete operation on an empty set.

Low severity - functionally safe, but the timing window could be tightened by checking this.pendingCleanups.size again after the race or accepting this benign race.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/proxy-agent/agent-pool.ts
Line: 352:358

Comment:
Race condition: if a cleanup promise settles and removes itself from `pendingCleanups` (line 421) **after** line 348 creates the array snapshot but **before** the `Promise.race` starts, that promise is still included in `pending` but no longer tracked. When it later settles during the race, line 421 tries to delete from an already-cleared set (line 363 clears after race completes), causing a harmless but unintended delete operation on an empty set.

Low severity - functionally safe, but the timing window could be tightened by checking `this.pendingCleanups.size` again after the race or accepting this benign race.

How can I resolve this? If you propose a fix, please make it concise.

} finally {
if (timeoutId) clearTimeout(timeoutId);
}

this.pendingCleanups.clear();
}

await Promise.all(closePromises);
this.cache.clear();
this.unhealthyKeys.clear();

Expand Down Expand Up @@ -372,23 +399,34 @@ export class AgentPoolImpl implements AgentPool {

// 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”)
// 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。
if (operation === "destroy") {
agent.destroy().catch((error) => {
logger.warn("AgentPool: Error closing agent", {
key,
operation,
error: error instanceof Error ? error.message : String(error),
});
});
} else if (operation === "close") {
agent.close().catch((error) => {
// 同时将 promise 纳入 pendingCleanups,便于 shutdown() 做 best-effort 的“优雅收尾”。
const cleanupPromise =
operation === "destroy" ? agent.destroy() : operation === "close" ? agent.close() : null;

if (!cleanupPromise) return;

let dropRefTimeoutId: NodeJS.Timeout | null = null;

const trackedPromise: Promise<void> = cleanupPromise
.catch((error) => {
logger.warn("AgentPool: Error closing agent", {
key,
operation,
error: error instanceof Error ? error.message : String(error),
});
})
.finally(() => {
if (dropRefTimeoutId) clearTimeout(dropRefTimeoutId);
this.pendingCleanups.delete(trackedPromise);
});
}

this.pendingCleanups.add(trackedPromise);

// 避免某些 dispatcher 永不 settle 导致 pendingCleanups 长期持有引用
dropRefTimeoutId = setTimeout(() => {
this.pendingCleanups.delete(trackedPromise);
}, 60000);
dropRefTimeoutId.unref();
} catch (error) {
logger.warn("AgentPool: Error closing agent", {
key,
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/lib/proxy-agent/agent-pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ describe("AgentPool", () => {
destroy?: () => Promise<void>;
};

// 说明:本文件顶部已 mock undici Agent/ProxyAgent,因此 destroy/close 应为 vi.fn,断言才有意义
if (typeof agent.destroy === "function") {
expect(vi.isMockFunction(agent.destroy)).toBe(true);
}
if (typeof agent.close === "function") {
expect(vi.isMockFunction(agent.close)).toBe(true);
}

// 模拟:close 可能因等待 in-flight 请求结束而长期不返回
if (typeof agent.close === "function") {
vi.mocked(agent.close).mockImplementation(() => new Promise<void>(() => {}));
Expand Down
Loading