From 90588852469db354782fb0addff0cc073683bce6 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Tue, 10 Feb 2026 22:34:00 +0800 Subject: [PATCH 01/11] =?UTF-8?q?fix(proxy):=20=E4=BF=AE=E5=A4=8D=20Gemini?= =?UTF-8?q?=20=E6=B5=81=E5=BC=8F=E9=80=8F=E4=BC=A0=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E9=93=BE=E8=B7=AF=E5=8D=A1=E6=AD=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Gemini SSE 透传不再在仅收到 headers 时清除首字节超时 - 首块数据到达后清除首字节超时,并支持 streamingIdleTimeoutMs 静默超时中断 - stats 任务失败时尽量落库/结束追踪,避免请求长期停留在 requesting - 添加回归测试覆盖无首块/首块后延迟/中途静默三种场景 --- src/app/v1/_lib/proxy/response-handler.ts | 209 +++++++-- ...gemini-stream-passthrough-timeouts.test.ts | 408 ++++++++++++++++++ 2 files changed, 587 insertions(+), 30 deletions(-) create mode 100644 tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 2842907a7..a636824e4 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -853,60 +853,131 @@ export class ProxyResponseHandler { } ); - // ⭐ gemini 透传立即清除首字节超时:透传路径收到响应即视为首字节到达 - const sessionWithCleanup = session as typeof session & { - clearResponseTimeout?: () => void; - }; - if (sessionWithCleanup.clearResponseTimeout) { - sessionWithCleanup.clearResponseTimeout(); - // ⭐ 同步记录 TTFB,与首字节超时口径一致 - session.recordTtfb(); - logger.debug( - "[ResponseHandler] Gemini passthrough: First byte timeout cleared on response received", - { - providerId: provider.id, - providerName: provider.name, - } - ); - } + // ⚠️ 注意:不要在“仅收到响应头”时清除首字节超时。 + // 背景:部分上游可能会快速返回 200 + SSE headers,但随后长时间不发送任何 body 数据。 + // 若在 headers 阶段就 clearResponseTimeout,会导致首字节超时失效,客户端与服务端都会表现为一直“请求中”。 + // 透传场景下,我们在后台 stats 读取到第一块数据时再清除超时(与非透传路径口径一致)。 const responseForStats = response.clone(); const statusCode = response.status; const taskId = `stream-passthrough-${messageContext.id}`; const statsPromise = (async () => { + const sessionWithCleanup = session as typeof session & { + clearResponseTimeout?: () => void; + }; + const sessionWithController = session as typeof session & { + responseController?: AbortController; + }; + + let reader: ReadableStreamDefaultReader | null = null; + const chunks: string[] = []; + const decoder = new TextDecoder(); + let isFirstChunk = true; + let streamEndedNormally = false; + let responseTimeoutCleared = false; + let abortReason: string | undefined; + + // ⭐ 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) + const idleTimeoutMs = + provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; + let idleTimeoutId: NodeJS.Timeout | null = null; + const clearIdleTimer = () => { + if (idleTimeoutId) { + clearTimeout(idleTimeoutId); + idleTimeoutId = null; + } + }; + const startIdleTimer = () => { + if (idleTimeoutMs === Infinity) return; + clearIdleTimer(); + idleTimeoutId = setTimeout(() => { + abortReason = "STREAM_IDLE_TIMEOUT"; + logger.warn("[ResponseHandler] Gemini passthrough streaming idle timeout triggered", { + taskId, + providerId: provider.id, + providerName: provider.name, + idleTimeoutMs, + chunksCollected: chunks.length, + }); + // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 + try { + sessionWithController.responseController?.abort(new Error("streaming_idle")); + } catch { + // ignore + } + }, idleTimeoutMs); + }; + + const clearResponseTimeoutOnce = (firstChunkSize?: number) => { + if (responseTimeoutCleared) return; + if (!sessionWithCleanup.clearResponseTimeout) return; + sessionWithCleanup.clearResponseTimeout(); + responseTimeoutCleared = true; + if (firstChunkSize != null) { + logger.debug( + "[ResponseHandler] Gemini passthrough: First chunk received, response timeout cleared", + { + taskId, + providerId: provider.id, + providerName: provider.name, + firstChunkSize, + } + ); + } + }; + + const flushAndJoin = (): string => { + const flushed = decoder.decode(); + if (flushed) chunks.push(flushed); + return chunks.join(""); + }; + try { - const reader = responseForStats.body?.getReader(); - if (!reader) return; + const body = responseForStats.body; + if (!body) return; + reader = body.getReader(); // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 - const chunks: string[] = []; - const decoder = new TextDecoder(); - let isFirstChunk = true; - let streamEndedNormally = false; - while (true) { if (session.clientAbortSignal?.aborted) break; const { done, value } = await reader.read(); if (done) { - streamEndedNormally = true; + const wasResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const clientAborted = session.clientAbortSignal?.aborted ?? false; + + // abort -> nodeStreamToWebStreamSafe 可能会把错误吞掉并 close(),导致 done=true; + // 这里必须结合 abort signal 判断是否为“自然结束”。 + if (wasResponseControllerAborted || clientAborted) { + streamEndedNormally = false; + abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT"; + } else { + streamEndedNormally = true; + } break; } + if (value) { if (isFirstChunk) { isFirstChunk = false; session.recordTtfb(); + clearResponseTimeoutOnce(value.length); } chunks.push(decoder.decode(value, { stream: true })); + + // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) + if (!isFirstChunk) { + startIdleTimer(); + } } } - const flushed = decoder.decode(); - if (flushed) chunks.push(flushed); - const allContent = chunks.join(""); + clearIdleTimer(); + const allContent = flushAndJoin(); const clientAborted = session.clientAbortSignal?.aborted ?? false; // 存储响应体到 Redis(5分钟过期) @@ -927,7 +998,8 @@ export class ProxyResponseHandler { allContent, statusCode, streamEndedNormally, - clientAborted + clientAborted, + abortReason ); await finalizeRequestStats( session, @@ -938,10 +1010,87 @@ export class ProxyResponseHandler { finalized.providerIdForPersistence ?? undefined ); } catch (error) { - if (!isClientAbortError(error as Error)) { - logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error); + const err = error as Error; + const clientAborted = session.clientAbortSignal?.aborted ?? false; + const isResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const isIdleTimeout = err.message?.includes("streaming_idle"); + + abortReason = + abortReason ?? + (clientAborted + ? "CLIENT_ABORTED" + : isIdleTimeout + ? "STREAM_IDLE_TIMEOUT" + : isResponseControllerAborted + ? "STREAM_RESPONSE_TIMEOUT" + : "STREAM_PROCESSING_ERROR"); + + // 透传的 stats 任务失败时,必须尽量落库并结束追踪,避免请求长期停留在“requesting” + logger.error("[ResponseHandler] Gemini passthrough stats task failed", { + taskId, + providerId: provider.id, + providerName: provider.name, + messageId: messageContext.id, + clientAborted, + isResponseControllerAborted, + isIdleTimeout, + abortReason, + errorName: err.name, + errorMessage: err.message || "(empty message)", + }); + + try { + clearIdleTimer(); + const allContent = flushAndJoin(); + const duration = Date.now() - session.startTime; + + const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( + session, + allContent, + statusCode, + false, + clientAborted, + abortReason + ); + + await finalizeRequestStats( + session, + allContent, + finalized.effectiveStatusCode, + duration, + finalized.errorMessage ?? abortReason, + finalized.providerIdForPersistence ?? undefined + ); + } catch (finalizeError) { + await persistRequestFailure({ + session, + messageContext, + statusCode: statusCode && statusCode >= 400 ? statusCode : 502, + error: finalizeError, + taskId, + phase: "stream", + }); } } finally { + clearIdleTimer(); + // 兜底:首块数据未到达时也应清理首字节超时,避免误触发/日志噪音 + try { + clearResponseTimeoutOnce(); + } catch { + // ignore + } + try { + // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传 + void reader?.cancel(); + } catch { + // ignore + } + try { + reader?.releaseLock(); + } catch { + // ignore + } AsyncTaskManager.cleanup(taskId); } })(); diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts new file mode 100644 index 000000000..a6cc2f66f --- /dev/null +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -0,0 +1,408 @@ +import { createServer } from "node:http"; +import type { Socket } from "node:net"; +import { describe, expect, test, vi } from "vitest"; +import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder"; +import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import type { Provider } from "@/types/provider"; + +const asyncTasks: Promise[] = []; + +const mocks = vi.hoisted(() => { + return { + isHttp2Enabled: vi.fn(async () => false), + }; +}); + +vi.mock("@/lib/config", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isHttp2Enabled: mocks.isHttp2Enabled, + }; +}); + +vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({ + ResponseFixer: { + process: async (_session: unknown, response: Response) => response, + }, +})); + +vi.mock("@/lib/async-task-manager", () => ({ + AsyncTaskManager: { + register: (_taskId: string, promise: Promise) => { + asyncTasks.push(promise); + return new AbortController(); + }, + cleanup: () => {}, + cancel: () => {}, + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + trace: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock("@/repository/message", () => ({ + updateMessageRequestCost: vi.fn(), + updateMessageRequestDetails: vi.fn(), + updateMessageRequestDuration: vi.fn(), +})); + +vi.mock("@/lib/session-manager", () => ({ + SessionManager: { + storeSessionResponse: vi.fn(), + updateSessionUsage: vi.fn(), + }, +})); + +vi.mock("@/lib/proxy-status-tracker", () => ({ + ProxyStatusTracker: { + getInstance: () => ({ + endRequest: () => {}, + }), + }, +})); + +function createProvider(overrides: Partial = {}): Provider { + return { + id: 1, + name: "p1", + url: "http://127.0.0.1:1", + key: "k", + providerVendorId: null, + isEnabled: true, + weight: 1, + priority: 0, + groupPriorities: null, + costMultiplier: 1, + groupTag: null, + providerType: "gemini", + preserveClientIp: false, + modelRedirects: null, + allowedModels: null, + mcpPassthroughType: "none", + mcpPassthroughUrl: null, + limit5hUsd: null, + limitDailyUsd: null, + dailyResetMode: "fixed", + dailyResetTime: "00:00", + limitWeeklyUsd: null, + limitMonthlyUsd: null, + limitTotalUsd: null, + totalCostResetAt: null, + limitConcurrentSessions: 0, + maxRetryAttempts: null, + circuitBreakerFailureThreshold: 5, + circuitBreakerOpenDuration: 1_800_000, + circuitBreakerHalfOpenSuccessThreshold: 2, + proxyUrl: null, + proxyFallbackToDirect: false, + firstByteTimeoutStreamingMs: 100, + streamingIdleTimeoutMs: 0, + requestTimeoutNonStreamingMs: 0, + websiteUrl: null, + faviconUrl: null, + cacheTtlPreference: null, + context1mPreference: null, + codexReasoningEffortPreference: null, + codexReasoningSummaryPreference: null, + codexTextVerbosityPreference: null, + codexParallelToolCallsPreference: null, + anthropicMaxTokensPreference: null, + anthropicThinkingBudgetPreference: null, + geminiGoogleSearchPreference: null, + tpm: 0, + rpm: 0, + rpd: 0, + cc: 0, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + ...overrides, + }; +} + +function createSession(params: { + clientAbortSignal: AbortSignal; + messageId: number; + userId: number; +}): ProxySession { + const headers = new Headers(); + const session = Object.create(ProxySession.prototype); + + Object.assign(session, { + startTime: Date.now(), + method: "POST", + requestUrl: new URL("https://example.com/v1/chat/completions"), + headers, + originalHeaders: new Headers(headers), + headerLog: JSON.stringify(Object.fromEntries(headers.entries())), + request: { + model: "gemini-2.0-flash", + log: "(test)", + message: { + model: "gemini-2.0-flash", + stream: true, + messages: [{ role: "user", content: "hi" }], + }, + }, + userAgent: null, + context: null, + clientAbortSignal: params.clientAbortSignal, + userName: "test-user", + authState: { success: true, user: null, key: null, apiKey: null }, + provider: null, + messageContext: { + id: params.messageId, + createdAt: new Date(), + user: { id: params.userId, name: "u1" }, + }, + sessionId: null, + requestSequence: 1, + originalFormat: "gemini", + providerType: null, + originalModelName: null, + originalUrlPathname: null, + providerChain: [], + cacheTtlResolved: null, + context1mApplied: false, + specialSettings: [], + cachedPriceData: undefined, + cachedBillingModelSource: undefined, + isHeaderModified: () => false, + }); + + return session as ProxySession; +} + +async function startSseServer(handler: Parameters[0]): Promise<{ + baseUrl: string; + close: () => Promise; +}> { + const sockets = new Set(); + const server = createServer(handler); + + server.on("connection", (socket) => { + sockets.add(socket); + socket.on("close", () => sockets.delete(socket)); + }); + + const baseUrl = await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); + if (!addr || typeof addr === "string") { + reject(new Error("Failed to get server address")); + return; + } + resolve(`http://127.0.0.1:${addr.port}`); + }); + }); + + const close = async () => { + for (const socket of sockets) { + try { + socket.destroy(); + } catch { + // ignore + } + } + sockets.clear(); + await new Promise((resolve) => server.close(() => resolve())); + }; + + return { baseUrl, close }; +} + +async function readWithTimeout( + reader: ReadableStreamDefaultReader, + timeoutMs: number +): Promise<{ ok: true; value: ReadableStreamReadResult } | { ok: false; reason: "timeout" }> { + const result = await Promise.race([ + reader.read().then((value) => ({ ok: true as const, value })), + new Promise<{ ok: false; reason: "timeout" }>((resolve) => + setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs) + ), + ]); + return result; +} + +describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { + test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + // 不发送任何 body,保持连接不结束 + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 200, + }); + const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 1, userId: 1 }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const reader = clientResponse.body?.getReader(); + expect(reader).toBeTruthy(); + if (!reader) throw new Error("Missing body reader"); + + const firstRead = await readWithTimeout(reader, 1500); + if (!firstRead.ok) { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)"); + } + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); + + test("收到首块数据后应清除首字节超时:后续 chunk 即使晚于 firstByteTimeout 也不应被误中断", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + res.write("data: {\"x\":1}\n\n"); + setTimeout(() => { + try { + res.write("data: {\"x\":2}\n\n"); + res.end(); + } catch { + // ignore + } + }, 150); + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 100, + streamingIdleTimeoutMs: 0, + }); + const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 2, userId: 1 }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const fullText = await Promise.race([ + clientResponse.text(), + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1500)), + ]); + if (fullText === "timeout") { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("读取透传响应超时(可能仍会卡死)"); + } + + // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块 + expect(fullText).toContain("\"x\":2"); + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); + + test("中途静默超过 streamingIdleTimeoutMs 时应中断,避免 200 跑到一半卡死", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + res.write("data: {\"x\":1}\n\n"); + // 不再发送数据,也不结束连接 + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 1000, + streamingIdleTimeoutMs: 120, + }); + const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 3, userId: 1 }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const reader = clientResponse.body?.getReader(); + expect(reader).toBeTruthy(); + if (!reader) throw new Error("Missing body reader"); + + const first = await readWithTimeout(reader, 1000); + expect(first.ok).toBe(true); + expect(first.ok && first.value.done).toBe(false); + + // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可) + const second = await readWithTimeout(reader, 1500); + if (!second.ok) { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("流式静默超时未生效:读后续数据在 1.5s 内仍未返回(可能仍会卡死)"); + } + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); +}); From 236deea064e96b349a50f2c7594b02d6e159b253 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 10 Feb 2026 14:34:45 +0000 Subject: [PATCH 02/11] chore: format code (fix-hang-stuck-requesting-v2-9058885) --- ...gemini-stream-passthrough-timeouts.test.ts | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index a6cc2f66f..2538974a2 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -224,7 +224,9 @@ async function startSseServer(handler: Parameters[0]): Prom async function readWithTimeout( reader: ReadableStreamDefaultReader, timeoutMs: number -): Promise<{ ok: true; value: ReadableStreamReadResult } | { ok: false; reason: "timeout" }> { +): Promise< + { ok: true; value: ReadableStreamReadResult } | { ok: false; reason: "timeout" } +> { const result = await Promise.race([ reader.read().then((value) => ({ ok: true as const, value })), new Promise<{ ok: false; reason: "timeout" }>((resolve) => @@ -253,7 +255,11 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { url: baseUrl, firstByteTimeoutStreamingMs: 200, }); - const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 1, userId: 1 }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 1, + userId: 1, + }); session.setProvider(provider); const doForward = ( @@ -295,10 +301,10 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { connection: "keep-alive", }); res.flushHeaders(); - res.write("data: {\"x\":1}\n\n"); + res.write('data: {"x":1}\n\n'); setTimeout(() => { try { - res.write("data: {\"x\":2}\n\n"); + res.write('data: {"x":2}\n\n'); res.end(); } catch { // ignore @@ -313,7 +319,11 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { firstByteTimeoutStreamingMs: 100, streamingIdleTimeoutMs: 0, }); - const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 2, userId: 1 }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 2, + userId: 1, + }); session.setProvider(provider); const doForward = ( @@ -340,7 +350,7 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { } // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块 - expect(fullText).toContain("\"x\":2"); + expect(fullText).toContain('"x":2'); } finally { clientAbortController.abort(new Error("test_cleanup")); await close(); @@ -357,7 +367,7 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { connection: "keep-alive", }); res.flushHeaders(); - res.write("data: {\"x\":1}\n\n"); + res.write('data: {"x":1}\n\n'); // 不再发送数据,也不结束连接 }); @@ -368,7 +378,11 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { firstByteTimeoutStreamingMs: 1000, streamingIdleTimeoutMs: 120, }); - const session = createSession({ clientAbortSignal: clientAbortController.signal, messageId: 3, userId: 1 }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 3, + userId: 1, + }); session.setProvider(provider); const doForward = ( From 549a8665c41ae01bb749d7028c4a469c3c4f030e Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Tue, 10 Feb 2026 23:49:29 +0800 Subject: [PATCH 03/11] =?UTF-8?q?fix(proxy):=20=E4=BF=AE=E5=A4=8D=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E5=8D=A1=E6=AD=BB=EF=BC=88AgentPool=20=E9=A9=B1?= =?UTF-8?q?=E9=80=90=E9=98=BB=E5=A1=9E=20+=20=E6=B5=81=E5=BC=8F=E9=80=8F?= =?UTF-8?q?=E4=BC=A0=E5=81=A5=E5=A3=AE=E6=80=A7=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AgentPool 驱逐时优先 destroy,避免 close 等待 in-flight 导致 getAgent/cleanup 卡住\n- Gemini SSE 透传 stats 读取增加内存上限、完善 abort reason 判定与清理日志\n- 补齐/加强回归测试 --- src/app/v1/_lib/proxy/response-handler.ts | 106 +++++++++++++++--- src/lib/proxy-agent/agent-pool.ts | 12 +- tests/unit/lib/proxy-agent/agent-pool.test.ts | 30 +++++ ...gemini-stream-passthrough-timeouts.test.ts | 27 ++++- 4 files changed, 154 insertions(+), 21 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index a636824e4..b77a425bf 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -871,7 +871,43 @@ export class ProxyResponseHandler { }; let reader: ReadableStreamDefaultReader | null = null; + // ⚠️ 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) + // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 + // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 + const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB const chunks: string[] = []; + const chunkBytes: number[] = []; + let chunkHead = 0; + let bufferedBytes = 0; + let wasTruncated = false; + + const pushChunk = (text: string, bytes: number) => { + if (!text) return; + chunks.push(text); + chunkBytes.push(bytes); + bufferedBytes += bytes; + + // 仅保留尾部窗口,避免内存无界增长 + while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { + bufferedBytes -= chunkBytes[chunkHead] ?? 0; + chunks[chunkHead] = ""; + chunkBytes[chunkHead] = 0; + chunkHead += 1; + wasTruncated = true; + } + + // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 + if (chunkHead > 4096) { + chunks.splice(0, chunkHead); + chunkBytes.splice(0, chunkHead); + chunkHead = 0; + } + }; + + const joinChunks = (): string => { + if (chunkHead <= 0) return chunks.join(""); + return chunks.slice(chunkHead).join(""); + }; const decoder = new TextDecoder(); let isFirstChunk = true; let streamEndedNormally = false; @@ -898,7 +934,9 @@ export class ProxyResponseHandler { providerId: provider.id, providerName: provider.name, idleTimeoutMs, - chunksCollected: chunks.length, + chunksCollected: Math.max(0, chunks.length - chunkHead), + bufferedBytes, + wasTruncated, }); // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 try { @@ -929,8 +967,8 @@ export class ProxyResponseHandler { const flushAndJoin = (): string => { const flushed = decoder.decode(); - if (flushed) chunks.push(flushed); - return chunks.join(""); + if (flushed) pushChunk(flushed, 0); + return joinChunks(); }; try { @@ -954,7 +992,9 @@ export class ProxyResponseHandler { // 这里必须结合 abort signal 判断是否为“自然结束”。 if (wasResponseControllerAborted || clientAborted) { streamEndedNormally = false; - abortReason = abortReason ?? "STREAM_RESPONSE_TIMEOUT"; + if (!abortReason) { + abortReason = clientAborted ? "CLIENT_ABORTED" : "STREAM_RESPONSE_TIMEOUT"; + } } else { streamEndedNormally = true; } @@ -967,7 +1007,7 @@ export class ProxyResponseHandler { session.recordTtfb(); clearResponseTimeoutOnce(value.length); } - chunks.push(decoder.decode(value, { stream: true })); + pushChunk(decoder.decode(value, { stream: true }), value.length); // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) if (!isFirstChunk) { @@ -981,7 +1021,7 @@ export class ProxyResponseHandler { const clientAborted = session.clientAbortSignal?.aborted ?? false; // 存储响应体到 Redis(5分钟过期) - if (session.sessionId) { + if (session.sessionId && !wasTruncated) { void SessionManager.storeSessionResponse( session.sessionId, allContent, @@ -989,6 +1029,13 @@ export class ProxyResponseHandler { ).catch((err) => { logger.error("[ResponseHandler] Failed to store stream passthrough response:", err); }); + } else if (session.sessionId && wasTruncated) { + logger.warn("[ResponseHandler] Skip storing passthrough response: body too large", { + taskId, + providerId: provider.id, + providerName: provider.name, + maxBytes: MAX_STATS_BUFFER_BYTES, + }); } // 使用共享的统计处理方法 @@ -1014,7 +1061,7 @@ export class ProxyResponseHandler { const clientAborted = session.clientAbortSignal?.aborted ?? false; const isResponseControllerAborted = sessionWithController.responseController?.signal.aborted ?? false; - const isIdleTimeout = err.message?.includes("streaming_idle"); + const isIdleTimeout = !!err.message?.includes("streaming_idle"); abortReason = abortReason ?? @@ -1074,22 +1121,51 @@ export class ProxyResponseHandler { } } finally { clearIdleTimer(); - // 兜底:首块数据未到达时也应清理首字节超时,避免误触发/日志噪音 + // 兜底:在流结束/中断后清理首字节超时,避免定时器泄漏 + // 注意:不应在流仍可能继续时清理(否则会让首字节超时失效) try { - clearResponseTimeoutOnce(); - } catch { - // ignore + const wasResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const clientAborted = session.clientAbortSignal?.aborted ?? false; + const shouldClearTimeout = + responseTimeoutCleared || + streamEndedNormally || + wasResponseControllerAborted || + clientAborted; + if (shouldClearTimeout) { + clearResponseTimeoutOnce(); + } + } catch (e) { + logger.warn( + "[ResponseHandler] Gemini passthrough: Failed to clear response timeout", + { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + } + ); } try { // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传 void reader?.cancel(); - } catch { - // ignore + } catch (e) { + logger.warn("[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + }); } try { reader?.releaseLock(); - } catch { - // ignore + } catch (e) { + logger.warn("[ResponseHandler] Gemini passthrough: Failed to release reader lock", { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + }); } AsyncTaskManager.cleanup(taskId); } diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index c10c58a15..2e34a93a4 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -356,11 +356,17 @@ export class AgentPoolImpl implements AgentPool { } private async closeAgent(agent: Dispatcher, key: string): Promise { + // 防御性处理:极端情况下(例如 mock/第三方 dispatcher 异常)可能传入空值 + if (!agent) return; + try { - if (typeof agent.close === "function") { - await agent.close(); - } else if (typeof agent.destroy === "function") { + // ⚠️ 优先 destroy:undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞), + // 从而让 getAgent/evictEndpoint/cleanup 也被卡住,最终表现为“所有请求都卡在 requesting”。 + // destroy() 会强制关闭底层连接,更适合作为驱逐/清理时的兜底手段。 + if (typeof agent.destroy === "function") { await agent.destroy(); + } else if (typeof agent.close === "function") { + await agent.close(); } } catch (error) { logger.warn("AgentPool: Error closing agent", { diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index df6f44461..410cc6411 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -443,6 +443,36 @@ describe("AgentPool", () => { const stats = pool.getPoolStats(); expect(stats.cacheSize).toBe(0); }); + + it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => { + const result = await pool.getAgent({ + endpointUrl: "https://api.anthropic.com/v1/messages", + proxyUrl: null, + enableHttp2: true, + }); + + const agent = result.agent as unknown as { + close?: () => Promise; + destroy?: () => Promise; + }; + + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 + if (typeof agent.close === "function") { + ( + agent.close as unknown as { mockImplementation: (fn: () => Promise) => void } + ).mockImplementation(() => new Promise(() => {})); + } + + await pool.shutdown(); + + // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住) + if (typeof agent.destroy === "function") { + expect(agent.destroy).toHaveBeenCalled(); + } + if (typeof agent.close === "function") { + expect(agent.close).not.toHaveBeenCalled(); + } + }); }); }); diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index 2538974a2..0180c1dac 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -225,10 +225,15 @@ async function readWithTimeout( reader: ReadableStreamDefaultReader, timeoutMs: number ): Promise< - { ok: true; value: ReadableStreamReadResult } | { ok: false; reason: "timeout" } + | { ok: true; value: ReadableStreamReadResult } + | { ok: true; error: unknown } + | { ok: false; reason: "timeout" } > { const result = await Promise.race([ - reader.read().then((value) => ({ ok: true as const, value })), + reader + .read() + .then((value) => ({ ok: true as const, value })) + .catch((error) => ({ ok: true as const, error })), new Promise<{ ok: false; reason: "timeout" }>((resolve) => setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs) ), @@ -280,11 +285,24 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { expect(reader).toBeTruthy(); if (!reader) throw new Error("Missing body reader"); + const startedAt = Date.now(); const firstRead = await readWithTimeout(reader, 1500); if (!firstRead.ok) { clientAbortController.abort(new Error("test_timeout")); throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)"); } + + // 断言:应由超时/中断导致读取结束(done=true 或抛错均可) + const ended = ("value" in firstRead && firstRead.value.done === true) || "error" in firstRead; + expect(ended).toBe(true); + + // 断言:responseController 应已触发 abort(即首字节超时生效) + const sessionWithController = session as unknown as { responseController?: AbortController }; + expect(sessionWithController.responseController?.signal.aborted).toBe(true); + + // 粗略时间断言:不应立即返回(避免“无关早退”导致假阳性) + const elapsed = Date.now() - startedAt; + expect(elapsed).toBeGreaterThanOrEqual(120); } finally { clientAbortController.abort(new Error("test_cleanup")); await close(); @@ -405,7 +423,10 @@ describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { const first = await readWithTimeout(reader, 1000); expect(first.ok).toBe(true); - expect(first.ok && first.value.done).toBe(false); + if (!("value" in first)) { + throw new Error("首块数据读取异常:预期拿到 value,但得到 error"); + } + expect(first.value.done).toBe(false); // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可) const second = await readWithTimeout(reader, 1500); From 4d0934b5248031c4c64a686291c57c83b800a346 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 00:15:22 +0800 Subject: [PATCH 04/11] =?UTF-8?q?test(proxy):=20=E8=A1=A5=E9=BD=90=20Agent?= =?UTF-8?q?Pool=20=E9=A9=B1=E9=80=90=E5=8D=A1=E6=AD=BB=E5=9B=9E=E5=BD=92?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/proxy-agent/agent-pool.test.ts | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index 410cc6411..b965397c4 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -235,6 +235,61 @@ describe("AgentPool", () => { expect(result2.agent).not.toBe(result1.agent); }); + it("should not hang when evicting an unhealthy agent whose close() never resolves", async () => { + // 说明:beforeEach 使用了 fake timers,但此用例需要依赖真实 setTimeout 做“防卡死”断言 + await pool.shutdown(); + vi.useRealTimers(); + + const realPool = new AgentPoolImpl(defaultConfig); + + const withTimeout = async (promise: Promise, ms: number): Promise => { + let timeoutId: ReturnType | null = null; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error("timeout")), ms); + }), + ]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + }; + + try { + const params = { + endpointUrl: "https://api.anthropic.com/v1/messages", + proxyUrl: null, + enableHttp2: true, + }; + + const result1 = await realPool.getAgent(params); + const agent1 = result1.agent as unknown as { + close?: { mockImplementation: (fn: () => Promise) => void }; + destroy?: unknown; + }; + + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 + agent1.close?.mockImplementation(() => new Promise(() => {})); + + realPool.markUnhealthy(result1.cacheKey, "test-hang-close"); + + const result2 = await withTimeout(realPool.getAgent(params), 500); + expect(result2.isNew).toBe(true); + expect(result2.agent).not.toBe(result1.agent); + + // destroy 应被优先调用(避免 close 挂死导致 getAgent/evict 卡住) + if (typeof (result1.agent as unknown as { destroy?: unknown }).destroy === "function") { + expect((result1.agent as unknown as { destroy: unknown }).destroy).toHaveBeenCalled(); + } + if (typeof (result1.agent as unknown as { close?: unknown }).close === "function") { + expect((result1.agent as unknown as { close: unknown }).close).not.toHaveBeenCalled(); + } + } finally { + await realPool.shutdown(); + } + }); + it("should track unhealthy agents in stats", async () => { const params = { endpointUrl: "https://api.anthropic.com/v1/messages", From 39b681036a0286f359b880b43a0fa2925700f556 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 00:36:34 +0800 Subject: [PATCH 05/11] =?UTF-8?q?fix(proxy):=20=E6=8C=89=20AI=20review=20?= =?UTF-8?q?=E5=8A=A0=E5=9B=BA=20stats=20=E8=AF=BB=E5=8F=96/=E6=B8=85?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 43 +++++++++++++++---- src/lib/proxy-agent/agent-pool.ts | 2 +- ...gemini-stream-passthrough-timeouts.test.ts | 10 +++++ 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index b77a425bf..059bb5cee 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -853,7 +853,7 @@ export class ProxyResponseHandler { } ); - // ⚠️ 注意:不要在“仅收到响应头”时清除首字节超时。 + // 注意:不要在“仅收到响应头”时清除首字节超时。 // 背景:部分上游可能会快速返回 200 + SSE headers,但随后长时间不发送任何 body 数据。 // 若在 headers 阶段就 clearResponseTimeout,会导致首字节超时失效,客户端与服务端都会表现为一直“请求中”。 // 透传场景下,我们在后台 stats 读取到第一块数据时再清除超时(与非透传路径口径一致)。 @@ -871,16 +871,22 @@ export class ProxyResponseHandler { }; let reader: ReadableStreamDefaultReader | null = null; - // ⚠️ 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) + // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB + const MAX_STATS_BUFFER_CHUNKS = 8192; const chunks: string[] = []; const chunkBytes: number[] = []; let chunkHead = 0; let bufferedBytes = 0; let wasTruncated = false; + const joinChunks = (): string => { + if (chunkHead <= 0) return chunks.join(""); + return chunks.slice(chunkHead).join(""); + }; + const pushChunk = (text: string, bytes: number) => { if (!text) return; chunks.push(text); @@ -902,11 +908,17 @@ export class ProxyResponseHandler { chunkBytes.splice(0, chunkHead); chunkHead = 0; } - }; - const joinChunks = (): string => { - if (chunkHead <= 0) return chunks.join(""); - return chunks.slice(chunkHead).join(""); + // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) + const keptCount = chunks.length - chunkHead; + if (keptCount > MAX_STATS_BUFFER_CHUNKS) { + const joined = joinChunks(); + chunks.length = 0; + chunkBytes.length = 0; + chunkHead = 0; + chunks.push(joined); + chunkBytes.push(bufferedBytes); + } }; const decoder = new TextDecoder(); let isFirstChunk = true; @@ -914,7 +926,7 @@ export class ProxyResponseHandler { let responseTimeoutCleared = false; let abortReason: string | undefined; - // ⭐ 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) + // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) const idleTimeoutMs = provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; let idleTimeoutId: NodeJS.Timeout | null = null; @@ -1057,7 +1069,7 @@ export class ProxyResponseHandler { finalized.providerIdForPersistence ?? undefined ); } catch (error) { - const err = error as Error; + const err = error instanceof Error ? error : new Error(String(error)); const clientAborted = session.clientAbortSignal?.aborted ?? false; const isResponseControllerAborted = sessionWithController.responseController?.signal.aborted ?? false; @@ -1148,7 +1160,20 @@ export class ProxyResponseHandler { } try { // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传 - void reader?.cancel(); + const cancelPromise = reader?.cancel(); + if (cancelPromise) { + cancelPromise.catch((err) => { + logger.warn( + "[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", + { + taskId, + providerId: provider.id, + providerName: provider.name, + error: err instanceof Error ? err.message : String(err), + } + ); + }); + } } catch (e) { logger.warn("[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", { taskId, diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 2e34a93a4..2db94bee6 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -360,7 +360,7 @@ export class AgentPoolImpl implements AgentPool { if (!agent) return; try { - // ⚠️ 优先 destroy:undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞), + // 注意:优先 destroy。undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞), // 从而让 getAgent/evictEndpoint/cleanup 也被卡住,最终表现为“所有请求都卡在 requesting”。 // destroy() 会强制关闭底层连接,更适合作为驱逐/清理时的兜底手段。 if (typeof agent.destroy === "function") { diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index 0180c1dac..f4ee2b98b 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -55,6 +55,16 @@ vi.mock("@/repository/message", () => ({ updateMessageRequestDuration: vi.fn(), })); +vi.mock("@/repository/system-config", () => ({ + getSystemSettings: vi.fn(async () => ({ billingModelSource: "original" })), +})); + +vi.mock("@/repository/model-price", () => ({ + findLatestPriceByModel: vi.fn(async () => ({ + priceData: { input_cost_per_token: 0, output_cost_per_token: 0 }, + })), +})); + vi.mock("@/lib/session-manager", () => ({ SessionManager: { storeSessionResponse: vi.fn(), From 4825f478d67449fcc6256d8fd3a5ef5a0024c85d Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 00:39:45 +0800 Subject: [PATCH 06/11] =?UTF-8?q?test(proxy):=20=E5=9B=9E=E5=BD=92?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=E7=BB=93=E6=9D=9F=E5=90=8E=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=20fake=20timers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/proxy-agent/agent-pool.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index b965397c4..7153dcbc3 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -287,6 +287,7 @@ describe("AgentPool", () => { } } finally { await realPool.shutdown(); + vi.useFakeTimers(); } }); From 6f36d0d2153eb6ed78ca216cc2cdc73a27c26458 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 09:46:29 +0800 Subject: [PATCH 07/11] =?UTF-8?q?fix(proxy):=20AgentPool=20=E9=A9=B1?= =?UTF-8?q?=E9=80=90=E4=B8=8D=E7=AD=89=E5=BE=85=20destroy/close?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - closeAgent 触发 destroy/close 后不 await,避免驱逐路径被卡住\n- Gemini 透传 stats:仅在收到非空 chunk 后清首字节超时\n- 回归测试:覆盖 close 无 destroy 且永不返回的场景 --- src/app/v1/_lib/proxy/response-handler.ts | 13 +++++---- src/lib/proxy-agent/agent-pool.ts | 29 ++++++++++++++++--- tests/unit/lib/proxy-agent/agent-pool.test.ts | 20 +++++++------ 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 059bb5cee..0448a079b 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1014,12 +1014,15 @@ export class ProxyResponseHandler { } if (value) { - if (isFirstChunk) { - isFirstChunk = false; - session.recordTtfb(); - clearResponseTimeoutOnce(value.length); + const chunkSize = value.byteLength; + if (chunkSize > 0) { + if (isFirstChunk) { + isFirstChunk = false; + session.recordTtfb(); + clearResponseTimeoutOnce(chunkSize); + } + pushChunk(decoder.decode(value, { stream: true }), chunkSize); } - pushChunk(decoder.decode(value, { stream: true }), value.length); // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) if (!isFirstChunk) { diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index 2db94bee6..61aba4587 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -363,10 +363,31 @@ export class AgentPoolImpl implements AgentPool { // 注意:优先 destroy。undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞), // 从而让 getAgent/evictEndpoint/cleanup 也被卡住,最终表现为“所有请求都卡在 requesting”。 // destroy() 会强制关闭底层连接,更适合作为驱逐/清理时的兜底手段。 - if (typeof agent.destroy === "function") { - await agent.destroy(); - } else if (typeof agent.close === "function") { - await agent.close(); + const operation = + typeof agent.destroy === "function" + ? ("destroy" as const) + : typeof agent.close === "function" + ? ("close" as const) + : null; + + // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”) + // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。 + if (operation === "destroy") { + agent.destroy().catch((error) => { + logger.warn("AgentPool: Error closing agent", { + key, + operation, + error: error instanceof Error ? error.message : String(error), + }); + }); + } else if (operation === "close") { + agent.close().catch((error) => { + logger.warn("AgentPool: Error closing agent", { + key, + operation, + error: error instanceof Error ? error.message : String(error), + }); + }); } } catch (error) { logger.warn("AgentPool: Error closing agent", { diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index 7153dcbc3..e4804592a 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -265,12 +265,19 @@ describe("AgentPool", () => { const result1 = await realPool.getAgent(params); const agent1 = result1.agent as unknown as { - close?: { mockImplementation: (fn: () => Promise) => void }; + close?: () => Promise; destroy?: unknown; }; + // 强制走 close() 分支:模拟某些 dispatcher 不支持 destroy() + agent1.destroy = undefined; + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 - agent1.close?.mockImplementation(() => new Promise(() => {})); + let closeCalled = false; + agent1.close = () => { + closeCalled = true; + return new Promise(() => {}); + }; realPool.markUnhealthy(result1.cacheKey, "test-hang-close"); @@ -278,13 +285,8 @@ describe("AgentPool", () => { expect(result2.isNew).toBe(true); expect(result2.agent).not.toBe(result1.agent); - // destroy 应被优先调用(避免 close 挂死导致 getAgent/evict 卡住) - if (typeof (result1.agent as unknown as { destroy?: unknown }).destroy === "function") { - expect((result1.agent as unknown as { destroy: unknown }).destroy).toHaveBeenCalled(); - } - if (typeof (result1.agent as unknown as { close?: unknown }).close === "function") { - expect((result1.agent as unknown as { close: unknown }).close).not.toHaveBeenCalled(); - } + // 断言:即使 close() 处于 pending,也不会阻塞 getAgent(),且会触发 close 调用 + expect(closeCalled).toBe(true); } finally { await realPool.shutdown(); vi.useFakeTimers(); From 0961732b9600e972cb7023f62ba65c2d5d4eb37a Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 09:54:58 +0800 Subject: [PATCH 08/11] =?UTF-8?q?test(proxy):=20=E5=BC=BA=E5=8C=96=20idle?= =?UTF-8?q?=20watchdog=20=E4=B8=8E=20mock=20=E9=9A=94=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 透传 stats:done=false 时也会重置 idle timer(避免 value 异常导致 watchdog 不工作)\n- 回归测试:reset hoisted isHttp2Enabled mock,避免跨用例污染 --- src/app/v1/_lib/proxy/response-handler.ts | 24 +++++++++---------- ...gemini-stream-passthrough-timeouts.test.ts | 7 +++++- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 0448a079b..473daab74 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -1013,21 +1013,19 @@ export class ProxyResponseHandler { break; } - if (value) { - const chunkSize = value.byteLength; - if (chunkSize > 0) { - if (isFirstChunk) { - isFirstChunk = false; - session.recordTtfb(); - clearResponseTimeoutOnce(chunkSize); - } - pushChunk(decoder.decode(value, { stream: true }), chunkSize); + const chunkSize = value?.byteLength ?? 0; + if (value && chunkSize > 0) { + if (isFirstChunk) { + isFirstChunk = false; + session.recordTtfb(); + clearResponseTimeoutOnce(chunkSize); } + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } - // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) - if (!isFirstChunk) { - startIdleTimer(); - } + // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) + if (!isFirstChunk) { + startIdleTimer(); } } diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts index f4ee2b98b..bc26ea8af 100644 --- a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -1,6 +1,6 @@ import { createServer } from "node:http"; import type { Socket } from "node:net"; -import { describe, expect, test, vi } from "vitest"; +import { beforeEach, describe, expect, test, vi } from "vitest"; import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder"; import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; import { ProxySession } from "@/app/v1/_lib/proxy/session"; @@ -14,6 +14,11 @@ const mocks = vi.hoisted(() => { }; }); +beforeEach(() => { + mocks.isHttp2Enabled.mockReset(); + mocks.isHttp2Enabled.mockResolvedValue(false); +}); + vi.mock("@/lib/config", async (importOriginal) => { const actual = await importOriginal(); return { From b231428ff8d1df4ea84bd0b39cc07ba1cce69e69 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 10:38:21 +0800 Subject: [PATCH 09/11] =?UTF-8?q?test(proxy):=20=E4=BF=AE=E5=A4=8D=20Agent?= =?UTF-8?q?Pool=20shutdown=20=E7=94=A8=E4=BE=8B=20close=20mock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/proxy-agent/agent-pool.test.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index e4804592a..3f664e331 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -516,9 +516,7 @@ describe("AgentPool", () => { // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 if (typeof agent.close === "function") { - ( - agent.close as unknown as { mockImplementation: (fn: () => Promise) => void } - ).mockImplementation(() => new Promise(() => {})); + vi.mocked(agent.close).mockImplementation(() => new Promise(() => {})); } await pool.shutdown(); From e53f8ddd60ef4264003802c8a86cdc3eefab4fb1 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 10:57:32 +0800 Subject: [PATCH 10/11] =?UTF-8?q?test:=20=E7=A8=B3=E5=AE=9A=20endpoint-cir?= =?UTF-8?q?cuit-breaker=20=E5=91=8A=E8=AD=A6=E7=94=A8=E4=BE=8B=EF=BC=88?= =?UTF-8?q?=E9=81=BF=E5=85=8D=20CI/bun=20=E4=B8=B2=E6=89=B0=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/endpoint-circuit-breaker.test.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/unit/lib/endpoint-circuit-breaker.test.ts b/tests/unit/lib/endpoint-circuit-breaker.test.ts index 90b938f9f..b9c1cfac5 100644 --- a/tests/unit/lib/endpoint-circuit-breaker.test.ts +++ b/tests/unit/lib/endpoint-circuit-breaker.test.ts @@ -40,6 +40,10 @@ describe("endpoint-circuit-breaker", () => { }); vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() })); + const sendAlertMock = vi.fn(async () => {}); + vi.doMock("@/lib/notification/notifier", () => ({ + sendCircuitBreakerAlert: sendAlertMock, + })); vi.doMock("@/lib/redis/endpoint-circuit-breaker-state", () => ({ loadEndpointCircuitState: loadMock, saveEndpointCircuitState: saveMock, @@ -57,6 +61,12 @@ describe("endpoint-circuit-breaker", () => { await recordEndpointFailure(1, new Error("boom")); await recordEndpointFailure(1, new Error("boom")); + // 说明:recordEndpointFailure 会触发异步告警(dynamic import + await),此处等待其完成 + // 避免后续测试覆盖 module mock 时导致 sendAlertMock 被额外调用(CI/bun 环境下更容易触发)。 + for (let i = 0; i < 10 && sendAlertMock.mock.calls.length === 0; i += 1) { + await Promise.resolve(); + } + const openState = saveMock.mock.calls[ saveMock.mock.calls.length - 1 ]?.[1] as SavedEndpointCircuitState; From 6d8faf0e0e64ad5d665957507c5272eab7e2270c Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Wed, 11 Feb 2026 11:05:04 +0800 Subject: [PATCH 11/11] =?UTF-8?q?test:=20=E5=BD=BB=E5=BA=95=E9=81=BF?= =?UTF-8?q?=E5=85=8D=20endpoint-circuit-breaker=20=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=91=8A=E8=AD=A6=E4=B8=B2=E6=89=B0=EF=BC=88CI/bun=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/unit/lib/endpoint-circuit-breaker.test.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/unit/lib/endpoint-circuit-breaker.test.ts b/tests/unit/lib/endpoint-circuit-breaker.test.ts index b9c1cfac5..9e0d8914f 100644 --- a/tests/unit/lib/endpoint-circuit-breaker.test.ts +++ b/tests/unit/lib/endpoint-circuit-breaker.test.ts @@ -61,12 +61,6 @@ describe("endpoint-circuit-breaker", () => { await recordEndpointFailure(1, new Error("boom")); await recordEndpointFailure(1, new Error("boom")); - // 说明:recordEndpointFailure 会触发异步告警(dynamic import + await),此处等待其完成 - // 避免后续测试覆盖 module mock 时导致 sendAlertMock 被额外调用(CI/bun 环境下更容易触发)。 - for (let i = 0; i < 10 && sendAlertMock.mock.calls.length === 0; i += 1) { - await Promise.resolve(); - } - const openState = saveMock.mock.calls[ saveMock.mock.calls.length - 1 ]?.[1] as SavedEndpointCircuitState; @@ -98,6 +92,15 @@ describe("endpoint-circuit-breaker", () => { await resetEndpointCircuit(1); expect(deleteMock).toHaveBeenCalledWith(1); + + // 说明:recordEndpointFailure 在达到阈值后会触发异步告警(dynamic import + await)。 + // 在 CI/bun 环境下,告警 Promise 可能在下一个测试开始后才完成,从而“借用”后续用例的 module mock, + // 导致 sendAlertMock 被额外调用而产生偶发失败。这里用真实计时器让事件循环前进,确保告警任务尽快落地。 + vi.useRealTimers(); + const startedAt = Date.now(); + while (sendAlertMock.mock.calls.length === 0 && Date.now() - startedAt < 1000) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } }); test("recordEndpointSuccess: closed 且 failureCount>0 时应清零", async () => {