diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 2842907a7..473daab74 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -853,64 +853,188 @@ export class ProxyResponseHandler { } ); - // ⭐ gemini 透传立即清除首字节超时:透传路径收到响应即视为首字节到达 - const sessionWithCleanup = session as typeof session & { - clearResponseTimeout?: () => void; - }; - if (sessionWithCleanup.clearResponseTimeout) { - sessionWithCleanup.clearResponseTimeout(); - // ⭐ 同步记录 TTFB,与首字节超时口径一致 - session.recordTtfb(); - logger.debug( - "[ResponseHandler] Gemini passthrough: First byte timeout cleared on response received", - { - providerId: provider.id, - providerName: provider.name, - } - ); - } + // 注意:不要在“仅收到响应头”时清除首字节超时。 + // 背景:部分上游可能会快速返回 200 + SSE headers,但随后长时间不发送任何 body 数据。 + // 若在 headers 阶段就 clearResponseTimeout,会导致首字节超时失效,客户端与服务端都会表现为一直“请求中”。 + // 透传场景下,我们在后台 stats 读取到第一块数据时再清除超时(与非透传路径口径一致)。 const responseForStats = response.clone(); const statusCode = response.status; const taskId = `stream-passthrough-${messageContext.id}`; const statsPromise = (async () => { + const sessionWithCleanup = session as typeof session & { + clearResponseTimeout?: () => void; + }; + const sessionWithController = session as typeof session & { + responseController?: AbortController; + }; + + let reader: ReadableStreamDefaultReader | null = null; + // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) + // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 + // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 + const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB + const MAX_STATS_BUFFER_CHUNKS = 8192; + const chunks: string[] = []; + const chunkBytes: number[] = []; + let chunkHead = 0; + let bufferedBytes = 0; + let wasTruncated = false; + + const joinChunks = (): string => { + if (chunkHead <= 0) return chunks.join(""); + return chunks.slice(chunkHead).join(""); + }; + + const pushChunk = (text: string, bytes: number) => { + if (!text) return; + chunks.push(text); + chunkBytes.push(bytes); + bufferedBytes += bytes; + + // 仅保留尾部窗口,避免内存无界增长 + while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { + bufferedBytes -= chunkBytes[chunkHead] ?? 0; + chunks[chunkHead] = ""; + chunkBytes[chunkHead] = 0; + chunkHead += 1; + wasTruncated = true; + } + + // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 + if (chunkHead > 4096) { + chunks.splice(0, chunkHead); + chunkBytes.splice(0, chunkHead); + chunkHead = 0; + } + + // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) + const keptCount = chunks.length - chunkHead; + if (keptCount > MAX_STATS_BUFFER_CHUNKS) { + const joined = joinChunks(); + chunks.length = 0; + chunkBytes.length = 0; + chunkHead = 0; + chunks.push(joined); + chunkBytes.push(bufferedBytes); + } + }; + const decoder = new TextDecoder(); + let isFirstChunk = true; + let streamEndedNormally = false; + let responseTimeoutCleared = false; + let abortReason: string | undefined; + + // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) + const idleTimeoutMs = + provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; + let idleTimeoutId: NodeJS.Timeout | null = null; + const clearIdleTimer = () => { + if (idleTimeoutId) { + clearTimeout(idleTimeoutId); + idleTimeoutId = null; + } + }; + const startIdleTimer = () => { + if (idleTimeoutMs === Infinity) return; + clearIdleTimer(); + idleTimeoutId = setTimeout(() => { + abortReason = "STREAM_IDLE_TIMEOUT"; + logger.warn("[ResponseHandler] Gemini passthrough streaming idle timeout triggered", { + taskId, + providerId: provider.id, + providerName: provider.name, + idleTimeoutMs, + chunksCollected: Math.max(0, chunks.length - chunkHead), + bufferedBytes, + wasTruncated, + }); + // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 + try { + sessionWithController.responseController?.abort(new Error("streaming_idle")); + } catch { + // ignore + } + }, idleTimeoutMs); + }; + + const clearResponseTimeoutOnce = (firstChunkSize?: number) => { + if (responseTimeoutCleared) return; + if (!sessionWithCleanup.clearResponseTimeout) return; + sessionWithCleanup.clearResponseTimeout(); + responseTimeoutCleared = true; + if (firstChunkSize != null) { + logger.debug( + "[ResponseHandler] Gemini passthrough: First chunk received, response timeout cleared", + { + taskId, + providerId: provider.id, + providerName: provider.name, + firstChunkSize, + } + ); + } + }; + + const flushAndJoin = (): string => { + const flushed = decoder.decode(); + if (flushed) pushChunk(flushed, 0); + return joinChunks(); + }; + try { - const reader = responseForStats.body?.getReader(); - if (!reader) return; + const body = responseForStats.body; + if (!body) return; + reader = body.getReader(); // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 - const chunks: string[] = []; - const decoder = new TextDecoder(); - let isFirstChunk = true; - let streamEndedNormally = false; - while (true) { if (session.clientAbortSignal?.aborted) break; const { done, value } = await reader.read(); if (done) { - streamEndedNormally = true; + const wasResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const clientAborted = session.clientAbortSignal?.aborted ?? false; + + // abort -> nodeStreamToWebStreamSafe 可能会把错误吞掉并 close(),导致 done=true; + // 这里必须结合 abort signal 判断是否为“自然结束”。 + if (wasResponseControllerAborted || clientAborted) { + streamEndedNormally = false; + if (!abortReason) { + abortReason = clientAborted ? "CLIENT_ABORTED" : "STREAM_RESPONSE_TIMEOUT"; + } + } else { + streamEndedNormally = true; + } break; } - if (value) { + + const chunkSize = value?.byteLength ?? 0; + if (value && chunkSize > 0) { if (isFirstChunk) { isFirstChunk = false; session.recordTtfb(); + clearResponseTimeoutOnce(chunkSize); } - chunks.push(decoder.decode(value, { stream: true })); + pushChunk(decoder.decode(value, { stream: true }), chunkSize); + } + + // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) + if (!isFirstChunk) { + startIdleTimer(); } } - const flushed = decoder.decode(); - if (flushed) chunks.push(flushed); - const allContent = chunks.join(""); + clearIdleTimer(); + const allContent = flushAndJoin(); const clientAborted = session.clientAbortSignal?.aborted ?? false; // 存储响应体到 Redis(5分钟过期) - if (session.sessionId) { + if (session.sessionId && !wasTruncated) { void SessionManager.storeSessionResponse( session.sessionId, allContent, @@ -918,6 +1042,13 @@ export class ProxyResponseHandler { ).catch((err) => { logger.error("[ResponseHandler] Failed to store stream passthrough response:", err); }); + } else if (session.sessionId && wasTruncated) { + logger.warn("[ResponseHandler] Skip storing passthrough response: body too large", { + taskId, + providerId: provider.id, + providerName: provider.name, + maxBytes: MAX_STATS_BUFFER_BYTES, + }); } // 使用共享的统计处理方法 @@ -927,7 +1058,8 @@ export class ProxyResponseHandler { allContent, statusCode, streamEndedNormally, - clientAborted + clientAborted, + abortReason ); await finalizeRequestStats( session, @@ -938,10 +1070,129 @@ export class ProxyResponseHandler { finalized.providerIdForPersistence ?? undefined ); } catch (error) { - if (!isClientAbortError(error as Error)) { - logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error); + const err = error instanceof Error ? error : new Error(String(error)); + const clientAborted = session.clientAbortSignal?.aborted ?? false; + const isResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const isIdleTimeout = !!err.message?.includes("streaming_idle"); + + abortReason = + abortReason ?? + (clientAborted + ? "CLIENT_ABORTED" + : isIdleTimeout + ? "STREAM_IDLE_TIMEOUT" + : isResponseControllerAborted + ? "STREAM_RESPONSE_TIMEOUT" + : "STREAM_PROCESSING_ERROR"); + + // 透传的 stats 任务失败时,必须尽量落库并结束追踪,避免请求长期停留在“requesting” + logger.error("[ResponseHandler] Gemini passthrough stats task failed", { + taskId, + providerId: provider.id, + providerName: provider.name, + messageId: messageContext.id, + clientAborted, + isResponseControllerAborted, + isIdleTimeout, + abortReason, + errorName: err.name, + errorMessage: err.message || "(empty message)", + }); + + try { + clearIdleTimer(); + const allContent = flushAndJoin(); + const duration = Date.now() - session.startTime; + + const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( + session, + allContent, + statusCode, + false, + clientAborted, + abortReason + ); + + await finalizeRequestStats( + session, + allContent, + finalized.effectiveStatusCode, + duration, + finalized.errorMessage ?? abortReason, + finalized.providerIdForPersistence ?? undefined + ); + } catch (finalizeError) { + await persistRequestFailure({ + session, + messageContext, + statusCode: statusCode && statusCode >= 400 ? statusCode : 502, + error: finalizeError, + taskId, + phase: "stream", + }); } } finally { + clearIdleTimer(); + // 兜底:在流结束/中断后清理首字节超时,避免定时器泄漏 + // 注意:不应在流仍可能继续时清理(否则会让首字节超时失效) + try { + const wasResponseControllerAborted = + sessionWithController.responseController?.signal.aborted ?? false; + const clientAborted = session.clientAbortSignal?.aborted ?? false; + const shouldClearTimeout = + responseTimeoutCleared || + streamEndedNormally || + wasResponseControllerAborted || + clientAborted; + if (shouldClearTimeout) { + clearResponseTimeoutOnce(); + } + } catch (e) { + logger.warn( + "[ResponseHandler] Gemini passthrough: Failed to clear response timeout", + { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + } + ); + } + try { + // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传 + const cancelPromise = reader?.cancel(); + if (cancelPromise) { + cancelPromise.catch((err) => { + logger.warn( + "[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", + { + taskId, + providerId: provider.id, + providerName: provider.name, + error: err instanceof Error ? err.message : String(err), + } + ); + }); + } + } catch (e) { + logger.warn("[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + }); + } + try { + reader?.releaseLock(); + } catch (e) { + logger.warn("[ResponseHandler] Gemini passthrough: Failed to release reader lock", { + taskId, + providerId: provider.id, + providerName: provider.name, + error: e instanceof Error ? e.message : String(e), + }); + } AsyncTaskManager.cleanup(taskId); } })(); diff --git a/src/lib/proxy-agent/agent-pool.ts b/src/lib/proxy-agent/agent-pool.ts index c10c58a15..61aba4587 100644 --- a/src/lib/proxy-agent/agent-pool.ts +++ b/src/lib/proxy-agent/agent-pool.ts @@ -356,11 +356,38 @@ export class AgentPoolImpl implements AgentPool { } private async closeAgent(agent: Dispatcher, key: string): Promise { + // 防御性处理:极端情况下(例如 mock/第三方 dispatcher 异常)可能传入空值 + if (!agent) return; + try { - if (typeof agent.close === "function") { - await agent.close(); - } else if (typeof agent.destroy === "function") { - await agent.destroy(); + // 注意:优先 destroy。undici 的 close() 可能会等待 in-flight 请求结束(流式/卡住时会导致长期阻塞), + // 从而让 getAgent/evictEndpoint/cleanup 也被卡住,最终表现为“所有请求都卡在 requesting”。 + // destroy() 会强制关闭底层连接,更适合作为驱逐/清理时的兜底手段。 + const operation = + typeof agent.destroy === "function" + ? ("destroy" as const) + : typeof agent.close === "function" + ? ("close" as const) + : null; + + // 关键点:驱逐/清理路径不能等待 in-flight(否则会把 getAgent() 也阻塞住,导致全局“requesting”) + // 因此这里发起 destroy/close 后不 await,仅记录异常,确保 eviction 始终快速返回。 + if (operation === "destroy") { + agent.destroy().catch((error) => { + logger.warn("AgentPool: Error closing agent", { + key, + operation, + error: error instanceof Error ? error.message : String(error), + }); + }); + } else if (operation === "close") { + agent.close().catch((error) => { + logger.warn("AgentPool: Error closing agent", { + key, + operation, + error: error instanceof Error ? error.message : String(error), + }); + }); } } catch (error) { logger.warn("AgentPool: Error closing agent", { diff --git a/tests/unit/lib/endpoint-circuit-breaker.test.ts b/tests/unit/lib/endpoint-circuit-breaker.test.ts index 90b938f9f..9e0d8914f 100644 --- a/tests/unit/lib/endpoint-circuit-breaker.test.ts +++ b/tests/unit/lib/endpoint-circuit-breaker.test.ts @@ -40,6 +40,10 @@ describe("endpoint-circuit-breaker", () => { }); vi.doMock("@/lib/logger", () => ({ logger: createLoggerMock() })); + const sendAlertMock = vi.fn(async () => {}); + vi.doMock("@/lib/notification/notifier", () => ({ + sendCircuitBreakerAlert: sendAlertMock, + })); vi.doMock("@/lib/redis/endpoint-circuit-breaker-state", () => ({ loadEndpointCircuitState: loadMock, saveEndpointCircuitState: saveMock, @@ -88,6 +92,15 @@ describe("endpoint-circuit-breaker", () => { await resetEndpointCircuit(1); expect(deleteMock).toHaveBeenCalledWith(1); + + // 说明:recordEndpointFailure 在达到阈值后会触发异步告警(dynamic import + await)。 + // 在 CI/bun 环境下,告警 Promise 可能在下一个测试开始后才完成,从而“借用”后续用例的 module mock, + // 导致 sendAlertMock 被额外调用而产生偶发失败。这里用真实计时器让事件循环前进,确保告警任务尽快落地。 + vi.useRealTimers(); + const startedAt = Date.now(); + while (sendAlertMock.mock.calls.length === 0 && Date.now() - startedAt < 1000) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } }); test("recordEndpointSuccess: closed 且 failureCount>0 时应清零", async () => { diff --git a/tests/unit/lib/proxy-agent/agent-pool.test.ts b/tests/unit/lib/proxy-agent/agent-pool.test.ts index df6f44461..3f664e331 100644 --- a/tests/unit/lib/proxy-agent/agent-pool.test.ts +++ b/tests/unit/lib/proxy-agent/agent-pool.test.ts @@ -235,6 +235,64 @@ describe("AgentPool", () => { expect(result2.agent).not.toBe(result1.agent); }); + it("should not hang when evicting an unhealthy agent whose close() never resolves", async () => { + // 说明:beforeEach 使用了 fake timers,但此用例需要依赖真实 setTimeout 做“防卡死”断言 + await pool.shutdown(); + vi.useRealTimers(); + + const realPool = new AgentPoolImpl(defaultConfig); + + const withTimeout = async (promise: Promise, ms: number): Promise => { + let timeoutId: ReturnType | null = null; + try { + return await Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout(() => reject(new Error("timeout")), ms); + }), + ]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + }; + + try { + const params = { + endpointUrl: "https://api.anthropic.com/v1/messages", + proxyUrl: null, + enableHttp2: true, + }; + + const result1 = await realPool.getAgent(params); + const agent1 = result1.agent as unknown as { + close?: () => Promise; + destroy?: unknown; + }; + + // 强制走 close() 分支:模拟某些 dispatcher 不支持 destroy() + agent1.destroy = undefined; + + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 + let closeCalled = false; + agent1.close = () => { + closeCalled = true; + return new Promise(() => {}); + }; + + realPool.markUnhealthy(result1.cacheKey, "test-hang-close"); + + const result2 = await withTimeout(realPool.getAgent(params), 500); + expect(result2.isNew).toBe(true); + expect(result2.agent).not.toBe(result1.agent); + + // 断言:即使 close() 处于 pending,也不会阻塞 getAgent(),且会触发 close 调用 + expect(closeCalled).toBe(true); + } finally { + await realPool.shutdown(); + vi.useFakeTimers(); + } + }); + it("should track unhealthy agents in stats", async () => { const params = { endpointUrl: "https://api.anthropic.com/v1/messages", @@ -443,6 +501,34 @@ describe("AgentPool", () => { const stats = pool.getPoolStats(); expect(stats.cacheSize).toBe(0); }); + + it("should prefer destroy over close to avoid hanging on in-flight streaming requests", async () => { + const result = await pool.getAgent({ + endpointUrl: "https://api.anthropic.com/v1/messages", + proxyUrl: null, + enableHttp2: true, + }); + + const agent = result.agent as unknown as { + close?: () => Promise; + destroy?: () => Promise; + }; + + // 模拟:close 可能因等待 in-flight 请求结束而长期不返回 + if (typeof agent.close === "function") { + vi.mocked(agent.close).mockImplementation(() => new Promise(() => {})); + } + + await pool.shutdown(); + + // destroy 应被优先调用(避免 close 挂死导致 shutdown/evict 卡住) + if (typeof agent.destroy === "function") { + expect(agent.destroy).toHaveBeenCalled(); + } + if (typeof agent.close === "function") { + expect(agent.close).not.toHaveBeenCalled(); + } + }); }); }); diff --git a/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts new file mode 100644 index 000000000..bc26ea8af --- /dev/null +++ b/tests/unit/proxy/response-handler-gemini-stream-passthrough-timeouts.test.ts @@ -0,0 +1,458 @@ +import { createServer } from "node:http"; +import type { Socket } from "node:net"; +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { ProxyForwarder } from "@/app/v1/_lib/proxy/forwarder"; +import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import type { Provider } from "@/types/provider"; + +const asyncTasks: Promise[] = []; + +const mocks = vi.hoisted(() => { + return { + isHttp2Enabled: vi.fn(async () => false), + }; +}); + +beforeEach(() => { + mocks.isHttp2Enabled.mockReset(); + mocks.isHttp2Enabled.mockResolvedValue(false); +}); + +vi.mock("@/lib/config", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + isHttp2Enabled: mocks.isHttp2Enabled, + }; +}); + +vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({ + ResponseFixer: { + process: async (_session: unknown, response: Response) => response, + }, +})); + +vi.mock("@/lib/async-task-manager", () => ({ + AsyncTaskManager: { + register: (_taskId: string, promise: Promise) => { + asyncTasks.push(promise); + return new AbortController(); + }, + cleanup: () => {}, + cancel: () => {}, + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + trace: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock("@/repository/message", () => ({ + updateMessageRequestCost: vi.fn(), + updateMessageRequestDetails: vi.fn(), + updateMessageRequestDuration: vi.fn(), +})); + +vi.mock("@/repository/system-config", () => ({ + getSystemSettings: vi.fn(async () => ({ billingModelSource: "original" })), +})); + +vi.mock("@/repository/model-price", () => ({ + findLatestPriceByModel: vi.fn(async () => ({ + priceData: { input_cost_per_token: 0, output_cost_per_token: 0 }, + })), +})); + +vi.mock("@/lib/session-manager", () => ({ + SessionManager: { + storeSessionResponse: vi.fn(), + updateSessionUsage: vi.fn(), + }, +})); + +vi.mock("@/lib/proxy-status-tracker", () => ({ + ProxyStatusTracker: { + getInstance: () => ({ + endRequest: () => {}, + }), + }, +})); + +function createProvider(overrides: Partial = {}): Provider { + return { + id: 1, + name: "p1", + url: "http://127.0.0.1:1", + key: "k", + providerVendorId: null, + isEnabled: true, + weight: 1, + priority: 0, + groupPriorities: null, + costMultiplier: 1, + groupTag: null, + providerType: "gemini", + preserveClientIp: false, + modelRedirects: null, + allowedModels: null, + mcpPassthroughType: "none", + mcpPassthroughUrl: null, + limit5hUsd: null, + limitDailyUsd: null, + dailyResetMode: "fixed", + dailyResetTime: "00:00", + limitWeeklyUsd: null, + limitMonthlyUsd: null, + limitTotalUsd: null, + totalCostResetAt: null, + limitConcurrentSessions: 0, + maxRetryAttempts: null, + circuitBreakerFailureThreshold: 5, + circuitBreakerOpenDuration: 1_800_000, + circuitBreakerHalfOpenSuccessThreshold: 2, + proxyUrl: null, + proxyFallbackToDirect: false, + firstByteTimeoutStreamingMs: 100, + streamingIdleTimeoutMs: 0, + requestTimeoutNonStreamingMs: 0, + websiteUrl: null, + faviconUrl: null, + cacheTtlPreference: null, + context1mPreference: null, + codexReasoningEffortPreference: null, + codexReasoningSummaryPreference: null, + codexTextVerbosityPreference: null, + codexParallelToolCallsPreference: null, + anthropicMaxTokensPreference: null, + anthropicThinkingBudgetPreference: null, + geminiGoogleSearchPreference: null, + tpm: 0, + rpm: 0, + rpd: 0, + cc: 0, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + ...overrides, + }; +} + +function createSession(params: { + clientAbortSignal: AbortSignal; + messageId: number; + userId: number; +}): ProxySession { + const headers = new Headers(); + const session = Object.create(ProxySession.prototype); + + Object.assign(session, { + startTime: Date.now(), + method: "POST", + requestUrl: new URL("https://example.com/v1/chat/completions"), + headers, + originalHeaders: new Headers(headers), + headerLog: JSON.stringify(Object.fromEntries(headers.entries())), + request: { + model: "gemini-2.0-flash", + log: "(test)", + message: { + model: "gemini-2.0-flash", + stream: true, + messages: [{ role: "user", content: "hi" }], + }, + }, + userAgent: null, + context: null, + clientAbortSignal: params.clientAbortSignal, + userName: "test-user", + authState: { success: true, user: null, key: null, apiKey: null }, + provider: null, + messageContext: { + id: params.messageId, + createdAt: new Date(), + user: { id: params.userId, name: "u1" }, + }, + sessionId: null, + requestSequence: 1, + originalFormat: "gemini", + providerType: null, + originalModelName: null, + originalUrlPathname: null, + providerChain: [], + cacheTtlResolved: null, + context1mApplied: false, + specialSettings: [], + cachedPriceData: undefined, + cachedBillingModelSource: undefined, + isHeaderModified: () => false, + }); + + return session as ProxySession; +} + +async function startSseServer(handler: Parameters[0]): Promise<{ + baseUrl: string; + close: () => Promise; +}> { + const sockets = new Set(); + const server = createServer(handler); + + server.on("connection", (socket) => { + sockets.add(socket); + socket.on("close", () => sockets.delete(socket)); + }); + + const baseUrl = await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const addr = server.address(); + if (!addr || typeof addr === "string") { + reject(new Error("Failed to get server address")); + return; + } + resolve(`http://127.0.0.1:${addr.port}`); + }); + }); + + const close = async () => { + for (const socket of sockets) { + try { + socket.destroy(); + } catch { + // ignore + } + } + sockets.clear(); + await new Promise((resolve) => server.close(() => resolve())); + }; + + return { baseUrl, close }; +} + +async function readWithTimeout( + reader: ReadableStreamDefaultReader, + timeoutMs: number +): Promise< + | { ok: true; value: ReadableStreamReadResult } + | { ok: true; error: unknown } + | { ok: false; reason: "timeout" } +> { + const result = await Promise.race([ + reader + .read() + .then((value) => ({ ok: true as const, value })) + .catch((error) => ({ ok: true as const, error })), + new Promise<{ ok: false; reason: "timeout" }>((resolve) => + setTimeout(() => resolve({ ok: false as const, reason: "timeout" }), timeoutMs) + ), + ]); + return result; +} + +describe("ProxyResponseHandler - Gemini stream passthrough timeouts", () => { + test("不应在仅收到 headers 时清除首字节超时:无首块数据时应在窗口内中断避免悬挂", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + // 不发送任何 body,保持连接不结束 + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 200, + }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 1, + userId: 1, + }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const reader = clientResponse.body?.getReader(); + expect(reader).toBeTruthy(); + if (!reader) throw new Error("Missing body reader"); + + const startedAt = Date.now(); + const firstRead = await readWithTimeout(reader, 1500); + if (!firstRead.ok) { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("首字节超时未生效:读首块数据在 1.5s 内仍未返回(可能仍会卡死)"); + } + + // 断言:应由超时/中断导致读取结束(done=true 或抛错均可) + const ended = ("value" in firstRead && firstRead.value.done === true) || "error" in firstRead; + expect(ended).toBe(true); + + // 断言:responseController 应已触发 abort(即首字节超时生效) + const sessionWithController = session as unknown as { responseController?: AbortController }; + expect(sessionWithController.responseController?.signal.aborted).toBe(true); + + // 粗略时间断言:不应立即返回(避免“无关早退”导致假阳性) + const elapsed = Date.now() - startedAt; + expect(elapsed).toBeGreaterThanOrEqual(120); + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); + + test("收到首块数据后应清除首字节超时:后续 chunk 即使晚于 firstByteTimeout 也不应被误中断", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + res.write('data: {"x":1}\n\n'); + setTimeout(() => { + try { + res.write('data: {"x":2}\n\n'); + res.end(); + } catch { + // ignore + } + }, 150); + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 100, + streamingIdleTimeoutMs: 0, + }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 2, + userId: 1, + }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const fullText = await Promise.race([ + clientResponse.text(), + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1500)), + ]); + if (fullText === "timeout") { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("读取透传响应超时(可能仍会卡死)"); + } + + // 第二块数据在 150ms 发送,若首字节超时未被清除,则 100ms 左右就会被中断拿不到第二块 + expect(fullText).toContain('"x":2'); + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); + + test("中途静默超过 streamingIdleTimeoutMs 时应中断,避免 200 跑到一半卡死", async () => { + asyncTasks.length = 0; + const { baseUrl, close } = await startSseServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }); + res.flushHeaders(); + res.write('data: {"x":1}\n\n'); + // 不再发送数据,也不结束连接 + }); + + const clientAbortController = new AbortController(); + try { + const provider = createProvider({ + url: baseUrl, + firstByteTimeoutStreamingMs: 1000, + streamingIdleTimeoutMs: 120, + }); + const session = createSession({ + clientAbortSignal: clientAbortController.signal, + messageId: 3, + userId: 1, + }); + session.setProvider(provider); + + const doForward = ( + ProxyForwarder as unknown as { + doForward: (this: typeof ProxyForwarder, ...args: unknown[]) => unknown; + } + ).doForward; + + const upstreamResponse = (await doForward.call( + ProxyForwarder, + session, + provider, + baseUrl + )) as Response; + + const clientResponse = await ProxyResponseHandler.dispatch(session, upstreamResponse); + const reader = clientResponse.body?.getReader(); + expect(reader).toBeTruthy(); + if (!reader) throw new Error("Missing body reader"); + + const first = await readWithTimeout(reader, 1000); + expect(first.ok).toBe(true); + if (!("value" in first)) { + throw new Error("首块数据读取异常:预期拿到 value,但得到 error"); + } + expect(first.value.done).toBe(false); + + // 静默超时触发后,后续 read 应该在合理时间内结束(done=true 或抛错均可) + const second = await readWithTimeout(reader, 1500); + if (!second.ok) { + clientAbortController.abort(new Error("test_timeout")); + throw new Error("流式静默超时未生效:读后续数据在 1.5s 内仍未返回(可能仍会卡死)"); + } + } finally { + clientAbortController.abort(new Error("test_cleanup")); + await close(); + await Promise.allSettled(asyncTasks); + } + }); +});