From 88e485e905ffebbcea98298ee401926f413c9b6c Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 12:44:03 +0800 Subject: [PATCH 1/5] =?UTF-8?q?fix(proxy):=20=E5=8A=A0=E5=9B=BA=20AgentPoo?= =?UTF-8?q?l=20=E6=B8=85=E7=90=86=E4=B8=8E=E9=80=8F=E4=BC=A0=20stats=20?= =?UTF-8?q?=E9=98=B2=E5=BE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AgentPool: close/destroy 走 fire-and-forget,并追踪 pending,shutdown best-effort - Gemini passthrough stats: 头+尾缓冲、截断 flush 边界、idle watchdog 兜底 - Usage SSE 检测改用 isSSEText,避免误判干扰 JSON 修复链路 - 补强 AgentPool 单测对 undici mock 的断言 --- src/app/v1/_lib/proxy/response-handler.ts | 128 ++++++++++++------ src/lib/proxy-agent/agent-pool.ts | 68 ++++++++-- tests/unit/lib/proxy-agent/agent-pool.test.ts | 8 ++ 3 files changed, 148 insertions(+), 56 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index fa7f9bfcb..9f340a9fb 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -10,7 +10,7 @@ import { SessionManager } from "@/lib/session-manager"; import { SessionTracker } from "@/lib/session-tracker"; import { calculateRequestCost } from "@/lib/utils/cost-calculation"; import { hasValidPriceData } from "@/lib/utils/price-data"; -import { parseSSEData } from "@/lib/utils/sse"; +import { isSSEText, parseSSEData } from "@/lib/utils/sse"; import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; import { updateMessageRequestCost, @@ -961,53 +961,95 @@ export class ProxyResponseHandler { let reader: ReadableStreamDefaultReader | null = null; // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) - // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 - // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 + // 说明:用于统计/结算的内容采用“头部 + 尾部窗口”: + // - 头部保留前 MAX_STATS_HEAD_BYTES(便于解析可能前置的 metadata) + // - 尾部保留最近 MAX_STATS_TAIL_BYTES(便于解析结尾 usage/假 200 等) + // - 中间部分会被丢弃(wasTruncated=true),统计将退化为 best-effort const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB - const MAX_STATS_BUFFER_CHUNKS = 8192; - const chunks: string[] = []; - const chunkBytes: number[] = []; - let chunkHead = 0; - let bufferedBytes = 0; + const MAX_STATS_HEAD_BYTES = 1024 * 1024; // 1MB + const MAX_STATS_TAIL_BYTES = MAX_STATS_BUFFER_BYTES - MAX_STATS_HEAD_BYTES; + const MAX_STATS_TAIL_CHUNKS = 8192; + + const headChunks: string[] = []; + let headBufferedBytes = 0; + + const tailChunks: string[] = []; + const tailChunkBytes: number[] = []; + let tailHead = 0; + let tailBufferedBytes = 0; let wasTruncated = false; + let inTailMode = false; + + const joinTailChunks = (): string => { + if (tailHead <= 0) return tailChunks.join(""); + return tailChunks.slice(tailHead).join(""); + }; const joinChunks = (): string => { - if (chunkHead <= 0) return chunks.join(""); - return chunks.slice(chunkHead).join(""); + const headText = headChunks.join(""); + if (!inTailMode) { + return headText; + } + + const tailText = joinTailChunks(); + + // 用 SSE comment 标记被截断的中间段;parseSSEData 会忽略 ":" 开头的行 + if (wasTruncated) { + // 插入空行强制 flush event,避免“头+尾”拼接后跨 event 误拼接数据行 + return `${headText}\n\n: [cch_truncated]\n\n${tailText}`; + } + + return `${headText}${tailText}`; }; const pushChunk = (text: string, bytes: number) => { if (!text) return; - chunks.push(text); - chunkBytes.push(bytes); - bufferedBytes += bytes; - - // 仅保留尾部窗口,避免内存无界增长 - while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { - bufferedBytes -= chunkBytes[chunkHead] ?? 0; - chunks[chunkHead] = ""; - chunkBytes[chunkHead] = 0; - chunkHead += 1; - wasTruncated = true; - } - // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 - if (chunkHead > 4096) { - chunks.splice(0, chunkHead); - chunkBytes.splice(0, chunkHead); - chunkHead = 0; - } + const pushToTail = () => { + tailChunks.push(text); + tailChunkBytes.push(bytes); + tailBufferedBytes += bytes; + + // 仅保留尾部窗口,避免内存无界增长 + while (tailBufferedBytes > MAX_STATS_TAIL_BYTES && tailHead < tailChunkBytes.length) { + tailBufferedBytes -= tailChunkBytes[tailHead] ?? 0; + tailChunks[tailHead] = ""; + tailChunkBytes[tailHead] = 0; + tailHead += 1; + wasTruncated = true; + } + + // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 + if (tailHead > 4096) { + tailChunks.splice(0, tailHead); + tailChunkBytes.splice(0, tailHead); + tailHead = 0; + } + + // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) + const keptCount = tailChunks.length - tailHead; + if (keptCount > MAX_STATS_TAIL_CHUNKS) { + const joined = joinTailChunks(); + tailChunks.length = 0; + tailChunkBytes.length = 0; + tailHead = 0; + tailChunks.push(joined); + tailChunkBytes.push(tailBufferedBytes); + } + }; - // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) - const keptCount = chunks.length - chunkHead; - if (keptCount > MAX_STATS_BUFFER_CHUNKS) { - const joined = joinChunks(); - chunks.length = 0; - chunkBytes.length = 0; - chunkHead = 0; - chunks.push(joined); - chunkBytes.push(bufferedBytes); + // 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断) + if (!inTailMode) { + if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) { + headChunks.push(text); + headBufferedBytes += bytes; + return; + } + + inTailMode = true; } + + pushToTail(); }; const decoder = new TextDecoder(); let isFirstChunk = true; @@ -1016,6 +1058,7 @@ export class ProxyResponseHandler { let abortReason: string | undefined; // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) + const hasFirstByteTimeout = (provider.firstByteTimeoutStreamingMs ?? 0) > 0; const idleTimeoutMs = provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; let idleTimeoutId: NodeJS.Timeout | null = null; @@ -1035,8 +1078,10 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, idleTimeoutMs, - chunksCollected: Math.max(0, chunks.length - chunkHead), - bufferedBytes, + chunksCollected: Math.max(0, tailChunks.length - tailHead), + headBufferedBytes, + tailBufferedBytes, + bufferedBytes: headBufferedBytes + tailBufferedBytes, wasTruncated, }); // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 @@ -1113,7 +1158,8 @@ export class ProxyResponseHandler { } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) - if (!isFirstChunk) { + // 若未配置首字节超时,则 done=false 后也启动 idle timer 作为兜底(包含 0-byte chunk/value=undefined) + if (!isFirstChunk || !hasFirstByteTimeout) { startIdleTimer(); } } @@ -2215,7 +2261,7 @@ export function parseUsageFromResponseText( // SSE 解析:支持两种格式 // 1. 标准 SSE (event: + data:) - Claude/OpenAI // 2. 纯 data: 格式 - Gemini - if (!usageMetrics && responseText.includes("data:")) { + if (!usageMetrics && isSSEText(responseText)) { const events = parseSSEData(responseText); // Claude SSE 特殊处理: diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 61aba4587..4adcd6850 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -165,6 +165,15 @@ export class AgentPoolImpl implements AgentPool { }; /** Pending agent creation promises to prevent race conditions */ private pendingCreations: Map> = new Map(); + /** + * Pending destroy/close promises (best-effort). + * + * 说明: + * - 驱逐/清理路径为了避免全局卡死,必须 fire-and-forget(不 await)。 + * - 但在 shutdown() 中我们仍希望尽量“优雅收尾”,因此在这里追踪 pending 的关闭任务。 + * - 若某些 dispatcher 永不 settle,这里会在超时后丢弃引用,避免内存泄漏。 + */ + private pendingCleanups: Set> = new Set(); constructor(config: Partial = {}) { this.config = { ...DEFAULT_CONFIG, ...config }; @@ -329,13 +338,30 @@ export class AgentPoolImpl implements AgentPool { this.cleanupTimer = null; } - const closePromises: Promise[] = []; - for (const [key, cached] of this.cache.entries()) { - closePromises.push(this.closeAgent(cached.agent, key)); + await this.closeAgent(cached.agent, key); + } + + // Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙) + if (this.pendingCleanups.size > 0) { + const pending = Array.from(this.pendingCleanups); + const WAIT_MS = 2000; + let timeoutId: NodeJS.Timeout | null = null; + try { + await Promise.race([ + Promise.allSettled(pending).then(() => {}), + new Promise((resolve) => { + timeoutId = setTimeout(resolve, WAIT_MS); + timeoutId.unref(); + }), + ]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + + this.pendingCleanups.clear(); } - await Promise.all(closePromises); this.cache.clear(); this.unhealthyKeys.clear(); @@ -372,23 +398,35 @@ export class AgentPoolImpl implements AgentPool { // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”) // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。 - if (operation === "destroy") { - agent.destroy().catch((error) => { - logger.warn("AgentPool: Error closing agent", { - key, - operation, - error: error instanceof Error ? error.message : String(error), - }); - }); - } else if (operation === "close") { - agent.close().catch((error) => { + // 同时将 promise 纳入 pendingCleanups,便于 shutdown() 做 best-effort 的“优雅收尾”。 + const cleanupPromise = + operation === "destroy" ? agent.destroy() : operation === "close" ? agent.close() : null; + + if (!cleanupPromise) return; + + let trackedPromise: Promise; + let dropRefTimeoutId: NodeJS.Timeout | null = null; + + trackedPromise = cleanupPromise + .catch((error) => { logger.warn("AgentPool: Error closing agent", { key, operation, error: error instanceof Error ? error.message : String(error), }); + }) + .finally(() => { + if (dropRefTimeoutId) clearTimeout(dropRefTimeoutId); + this.pendingCleanups.delete(trackedPromise); }); - } + + this.pendingCleanups.add(trackedPromise); + + // 避免某些 dispatcher 永不 settle 导致 pendingCleanups 长期持有引用 + dropRefTimeoutId = setTimeout(() => { + this.pendingCleanups.delete(trackedPromise); + }, 60000); + dropRefTimeoutId.unref(); } catch (error) { logger.warn("AgentPool: Error closing agent", { key, diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index 3f664e331..33c40da26 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -514,6 +514,14 @@ describe("AgentPool", () => { destroy?: () => Promise; }; + // 说明:本文件顶部已 mock undici Agent/ProxyAgent,因此 destroy/close 应为 vi.fn,断言才有意义 + if (typeof agent.destroy === "function") { + expect(vi.isMockFunction(agent.destroy)).toBe(true); + } + if (typeof agent.close === "function") { + expect(vi.isMockFunction(agent.close)).toBe(true); + } + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 if (typeof agent.close === "function") { vi.mocked(agent.close).mockImplementation(() => new Promise(() => {})); From dbcd4b625bdc9cccd70be96a803835676e3a4d32 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 13:01:55 +0800 Subject: [PATCH 2/5] =?UTF-8?q?test:=20=E8=A1=A5=E9=BD=90=20probe=20?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=E7=9A=84=20circuit=20breaker=20mock=20?= =?UTF-8?q?=E5=AF=BC=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加 getEndpointCircuitStateSync/resetEndpointCircuit,避免新增导出导致 vitest mock 访问时报错 --- .../unit/lib/provider-endpoints/probe.test.ts | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/unit/lib/provider-endpoints/probe.test.ts b/tests/unit/lib/provider-endpoints/probe.test.ts index c77b04845..875d3101e 100644 --- a/tests/unit/lib/provider-endpoints/probe.test.ts +++ b/tests/unit/lib/provider-endpoints/probe.test.ts @@ -50,7 +50,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const fetchMock = vi.fn(async (_url: string, init?: RequestInit) => { @@ -90,7 +92,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const fetchMock = vi.fn(async (_url: string, init?: RequestInit) => { @@ -133,7 +137,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -171,7 +177,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -207,7 +215,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const fetchMock = vi.fn(async () => { @@ -252,7 +262,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: snapshotMock, })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: recordFailureMock, + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -298,7 +310,9 @@ describe("provider-endpoints: probe", () => { updateProviderEndpointProbeSnapshot: snapshotMock, })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: recordFailureMock, + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -368,7 +382,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: recordMock, })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: recordFailureMock, + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -408,7 +424,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: recordMock, })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: recordFailureMock, + resetEndpointCircuit: vi.fn(async () => {}), })); vi.stubGlobal( @@ -444,7 +462,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); // Mock net.createConnection to simulate successful TCP connection @@ -497,7 +517,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const mockSocket = { @@ -542,7 +564,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const { probeEndpointUrl } = await import("@/lib/provider-endpoints/probe"); @@ -572,7 +596,9 @@ describe("provider-endpoints: probe", () => { recordProviderEndpointProbeResult: vi.fn(), })); vi.doMock("@/lib/endpoint-circuit-breaker", () => ({ + getEndpointCircuitStateSync: vi.fn(() => "closed"), recordEndpointFailure: vi.fn(async () => {}), + resetEndpointCircuit: vi.fn(async () => {}), })); const mockSocket = { From ee08a4ab058dcc53aef61e162587a7aa82795bb3 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 13:37:16 +0800 Subject: [PATCH 3/5] =?UTF-8?q?fix(proxy):=20=E4=BF=AE=E6=AD=A3=20stats=20?= =?UTF-8?q?head=20=E8=BE=B9=E7=95=8C=E4=B8=8E=20shutdown=20=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 20 +++++++++++++++++++- src/lib/proxy-agent/agent-pool.ts | 7 ++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 9f340a9fb..de56b3498 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1154,7 +1154,25 @@ export class ProxyResponseHandler { session.recordTtfb(); clearResponseTimeoutOnce(chunkSize); } - pushChunk(decoder.decode(value, { stream: true }), chunkSize); + + // 尽量填满 head:边界 chunk 可能跨过 head 上限,按 byte 切分以避免 head 少于 1MB + if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) { + const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes; + if (remainingHeadBytes > 0 && chunkSize > remainingHeadBytes) { + const headPart = value.subarray(0, remainingHeadBytes); + const tailPart = value.subarray(remainingHeadBytes); + + const headText = decoder.decode(headPart, { stream: true }); + pushChunk(headText, remainingHeadBytes); + + const tailText = decoder.decode(tailPart, { stream: true }); + pushChunk(tailText, chunkSize - remainingHeadBytes); + } else { + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } + } else { + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 4adcd6850..78387ebb2 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -338,9 +338,10 @@ export class AgentPoolImpl implements AgentPool { this.cleanupTimer = null; } - for (const [key, cached] of this.cache.entries()) { - await this.closeAgent(cached.agent, key); - } + // closeAgent 本身是 fire-and-forget(不 await destroy/close),这里并行触发即可。 + await Promise.allSettled( + Array.from(this.cache.entries()).map(([key, cached]) => this.closeAgent(cached.agent, key)) + ); // Best-effort:等待部分 pending cleanup 完成,但永不无限等待(避免重蹈 “close() 等待 in-flight” 的覆辙) if (this.pendingCleanups.size > 0) { From a172ad85808385a6230865ffc6b8a840ba6541da Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 14:03:42 +0800 Subject: [PATCH 4/5] =?UTF-8?q?fix(proxy):=20=E8=A1=A5=E9=BD=90=20passthro?= =?UTF-8?q?ugh=20chunksCollected=20=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 2 +- src/lib/proxy-agent/agent-pool.ts | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index de56b3498..7433afbec 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1078,7 +1078,7 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, idleTimeoutMs, - chunksCollected: Math.max(0, tailChunks.length - tailHead), + chunksCollected: headChunks.length + Math.max(0, tailChunks.length - tailHead), headBufferedBytes, tailBufferedBytes, bufferedBytes: headBufferedBytes + tailBufferedBytes, diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 78387ebb2..037183794 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -405,10 +405,9 @@ export class AgentPoolImpl implements AgentPool { if (!cleanupPromise) return; - let trackedPromise: Promise; let dropRefTimeoutId: NodeJS.Timeout | null = null; - trackedPromise = cleanupPromise + const trackedPromise: Promise = cleanupPromise .catch((error) => { logger.warn("AgentPool: Error closing agent", { key, From 0152bab9d4989e743f1fafe27b581d4196f478d6 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 15:23:36 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix(proxy):=20idle=20watchdog=20=E4=BB=85?= =?UTF-8?q?=E5=9C=A8=E9=A6=96=E5=9D=97=E5=90=8E=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 7433afbec..c2afe9d90 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1058,7 +1058,6 @@ export class ProxyResponseHandler { let abortReason: string | undefined; // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) - const hasFirstByteTimeout = (provider.firstByteTimeoutStreamingMs ?? 0) > 0; const idleTimeoutMs = provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; let idleTimeoutId: NodeJS.Timeout | null = null; @@ -1176,8 +1175,7 @@ export class ProxyResponseHandler { } // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) - // 若未配置首字节超时,则 done=false 后也启动 idle timer 作为兜底(包含 0-byte chunk/value=undefined) - if (!isFirstChunk || !hasFirstByteTimeout) { + if (!isFirstChunk) { startIdleTimer(); } }