diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index fa7f9bfcb..c2afe9d90 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -10,7 +10,7 @@ import { SessionManager } from "@/lib/session-manager"; import { SessionTracker } from "@/lib/session-tracker"; import { calculateRequestCost } from "@/lib/utils/cost-calculation"; import { hasValidPriceData } from "@/lib/utils/price-data"; -import { parseSSEData } from "@/lib/utils/sse"; +import { isSSEText, parseSSEData } from "@/lib/utils/sse"; import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; import { updateMessageRequestCost, @@ -961,53 +961,95 @@ export class ProxyResponseHandler { let reader: ReadableStreamDefaultReader | null = null; // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) - // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 - // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 + // 说明:用于统计/结算的内容采用“头部 + 尾部窗口”: + // - 头部保留前 MAX_STATS_HEAD_BYTES(便于解析可能前置的 metadata) + // - 尾部保留最近 MAX_STATS_TAIL_BYTES(便于解析结尾 usage/假 200 等) + // - 中间部分会被丢弃(wasTruncated=true),统计将退化为 best-effort const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB - const MAX_STATS_BUFFER_CHUNKS = 8192; - const chunks: string[] = []; - const chunkBytes: number[] = []; - let chunkHead = 0; - let bufferedBytes = 0; + const MAX_STATS_HEAD_BYTES = 1024 * 1024; // 1MB + const MAX_STATS_TAIL_BYTES = MAX_STATS_BUFFER_BYTES - MAX_STATS_HEAD_BYTES; + const MAX_STATS_TAIL_CHUNKS = 8192; + + const headChunks: string[] = []; + let headBufferedBytes = 0; + + const tailChunks: string[] = []; + const tailChunkBytes: number[] = []; + let tailHead = 0; + let tailBufferedBytes = 0; let wasTruncated = false; + let inTailMode = false; + + const joinTailChunks = (): string => { + if (tailHead <= 0) return tailChunks.join(""); + return tailChunks.slice(tailHead).join(""); + }; const joinChunks = (): string => { - if (chunkHead <= 0) return chunks.join(""); - return chunks.slice(chunkHead).join(""); + const headText = headChunks.join(""); + if (!inTailMode) { + return headText; + } + + const tailText = joinTailChunks(); + + // 用 SSE comment 标记被截断的中间段;parseSSEData 会忽略 ":" 开头的行 + if (wasTruncated) { + // 插入空行强制 flush event,避免“头+尾”拼接后跨 event 误拼接数据行 + return `${headText}\n\n: [cch_truncated]\n\n${tailText}`; + } + + return `${headText}${tailText}`; }; const pushChunk = (text: string, bytes: number) => { if (!text) return; - chunks.push(text); - chunkBytes.push(bytes); - bufferedBytes += bytes; - - // 仅保留尾部窗口,避免内存无界增长 - while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { - bufferedBytes -= chunkBytes[chunkHead] ?? 0; - chunks[chunkHead] = ""; - chunkBytes[chunkHead] = 0; - chunkHead += 1; - wasTruncated = true; - } - // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 - if (chunkHead > 4096) { - chunks.splice(0, chunkHead); - chunkBytes.splice(0, chunkHead); - chunkHead = 0; - } + const pushToTail = () => { + tailChunks.push(text); + tailChunkBytes.push(bytes); + tailBufferedBytes += bytes; + + // 仅保留尾部窗口,避免内存无界增长 + while (tailBufferedBytes > MAX_STATS_TAIL_BYTES && tailHead < tailChunkBytes.length) { + tailBufferedBytes -= tailChunkBytes[tailHead] ?? 0; + tailChunks[tailHead] = ""; + tailChunkBytes[tailHead] = 0; + tailHead += 1; + wasTruncated = true; + } + + // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 + if (tailHead > 4096) { + tailChunks.splice(0, tailHead); + tailChunkBytes.splice(0, tailHead); + tailHead = 0; + } + + // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) + const keptCount = tailChunks.length - tailHead; + if (keptCount > MAX_STATS_TAIL_CHUNKS) { + const joined = joinTailChunks(); + tailChunks.length = 0; + tailChunkBytes.length = 0; + tailHead = 0; + tailChunks.push(joined); + tailChunkBytes.push(tailBufferedBytes); + } + }; - // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) - const keptCount = chunks.length - chunkHead; - if (keptCount > MAX_STATS_BUFFER_CHUNKS) { - const joined = joinChunks(); - chunks.length = 0; - chunkBytes.length = 0; - chunkHead = 0; - chunks.push(joined); - chunkBytes.push(bufferedBytes); + // 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断) + if (!inTailMode) { + if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) { + headChunks.push(text); + headBufferedBytes += bytes; + return; + } + + inTailMode = true; } + + pushToTail(); }; const decoder = new TextDecoder(); let isFirstChunk = true; @@ -1035,8 +1077,10 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, idleTimeoutMs, - chunksCollected: Math.max(0, chunks.length - chunkHead), - bufferedBytes, + chunksCollected: headChunks.length + Math.max(0, tailChunks.length - tailHead), + headBufferedBytes, + tailBufferedBytes, + bufferedBytes: headBufferedBytes + tailBufferedBytes, wasTruncated, }); // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 @@ -1109,7 +1153,25 @@ export class ProxyResponseHandler { session.recordTtfb(); clearResponseTimeoutOnce(chunkSize); } - pushChunk(decoder.decode(value, { stream: true }), chunkSize); + + // 尽量填满 head:边界 chunk 可能跨过 head 上限,按 byte 切分以避免 head 少于 1MB + if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) { + const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes; + if (remainingHeadBytes > 0 && chunkSize > remainingHeadBytes) { + const headPart = value.subarray(0, remainingHeadBytes); + const tailPart = value.subarray(remainingHeadBytes); + + const headText = decoder.decode(headPart, { stream: true }); + pushChunk(headText, remainingHeadBytes); + + const tailText = decoder.decode(tailPart, { stream: true }); + pushChunk(tailText, chunkSize - remainingHeadBytes); + } else { + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } + } else { + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) @@ -2215,7 +2277,7 @@ export function parseUsageFromResponseText( // SSE 解析:支持两种格式 // 1. 标准 SSE (event: + data:) - Claude/OpenAI // 2. 纯 data: 格式 - Gemini - if (!usageMetrics && responseText.includes("data:")) { + if (!usageMetrics && isSSEText(responseText)) { const events = parseSSEData(responseText); // Claude SSE 特殊处理: diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 61aba4587..037183794 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -165,6 +165,15 @@ export class AgentPoolImpl implements AgentPool { }; /** Pending agent creation promises to prevent race conditions */ private pendingCreations: Map> = new Map(); + /** + * Pending destroy/close promises (best-effort). + * + * 说明: + * - 驱逐/清理路径为了避免全局卡死,必须 fire-and-forget(不 await)。 + * - 但在 shutdown() 中我们仍希望尽量“优雅收尾”,因此在这里追踪 pending 的关闭任务。 + * - 若某些 dispatcher 永不 settle,这里会在超时后丢弃引用,避免内存泄漏。 + */ + private pendingCleanups: Set> = new Set(); constructor(config: Partial = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; @@ -329,13 +338,31 @@ export class AgentPoolImpl implements AgentPool { this.cleanupTimer = null; } - const closePromises: Promise[] = []; + // closeAgent 本身是 fire-and-forget(不 await destroy/close),这里并行触发即可。 + await Promise.allSettled( + Array.from(this.cache.entries()).map(([key, cached]) => this.closeAgent(cached.agent, key)) + ); - for (const [key, cached] of this.cache.entries()) { - closePromises.push(this.closeAgent(cached.agent, key)); + // Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙) + if (this.pendingCleanups.size > 0) { + const pending = Array.from(this.pendingCleanups); + const WAIT_MS = 2000; + let timeoutId: NodeJS.Timeout | null = null; + try { + await Promise.race([ + Promise.allSettled(pending).then(() => {}), + new Promise((resolve) => { + timeoutId = setTimeout(resolve, WAIT_MS); + timeoutId.unref(); + }), + ]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + + this.pendingCleanups.clear(); } - await Promise.all(closePromises); this.cache.clear(); this.unhealthyKeys.clear(); @@ -372,23 +399,34 @@ export class AgentPoolImpl implements AgentPool { // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”) // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。 - if (operation === "destroy") { - agent.destroy().catch((error) => { - logger.warn("AgentPool: Error closing agent", { - key, - operation, - error: error instanceof Error ? error.message : String(error), - }); - }); - } else if (operation === "close") { - agent.close().catch((error) => { + // 同时将 promise 纳入 pendingCleanups,便于 shutdown() 做 best-effort 的“优雅收尾”。 + const cleanupPromise = + operation === "destroy" ? agent.destroy() : operation === "close" ? agent.close() : null; + + if (!cleanupPromise) return; + + let dropRefTimeoutId: NodeJS.Timeout | null = null; + + const trackedPromise: Promise = cleanupPromise + .catch((error) => { logger.warn("AgentPool: Error closing agent", { key, operation, error: error instanceof Error ? error.message : String(error), }); + }) + .finally(() => { + if (dropRefTimeoutId) clearTimeout(dropRefTimeoutId); + this.pendingCleanups.delete(trackedPromise); }); - } + + this.pendingCleanups.add(trackedPromise); + + // 避免某些 dispatcher 永不 settle 导致 pendingCleanups 长期持有引用 + dropRefTimeoutId = setTimeout(() => { + this.pendingCleanups.delete(trackedPromise); + }, 60000); + dropRefTimeoutId.unref(); } catch (error) { logger.warn("AgentPool: Error closing agent", { key, diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index 3f664e331..33c40da26 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -514,6 +514,14 @@ describe("AgentPool", () => { destroy?: () => Promise; }; + // 说明:本文件顶部已 mock undici Agent/ProxyAgent,因此 destroy/close 应为 vi.fn,断言才有意义 + if (typeof agent.destroy === "function") { + expect(vi.isMockFunction(agent.destroy)).toBe(true); + } + if (typeof agent.close === "function") { + expect(vi.isMockFunction(agent.close)).toBe(true); + } + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 if (typeof agent.close === "function") { vi.mocked(agent.close).mockImplementation(() => new Promise(() => {}));