-
-
Notifications
You must be signed in to change notification settings - Fork 199
fix(proxy): 修复请求卡死(AgentPool 驱逐阻塞) #759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9058885
236deea
549a866
4d0934b
39b6810
4825f47
6f36d0d
0961732
b231428
e53f8dd
6d8faf0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -853,71 +853,202 @@ export class ProxyResponseHandler { | |
| } | ||
| ); | ||
|
|
||
| // ⭐ gemini 透传立即清除首字节超时:透传路径收到响应即视为首字节到达 | ||
| const sessionWithCleanup = session as typeof session & { | ||
| clearResponseTimeout?: () => void; | ||
| }; | ||
| if (sessionWithCleanup.clearResponseTimeout) { | ||
| sessionWithCleanup.clearResponseTimeout(); | ||
| // ⭐ 同步记录 TTFB,与首字节超时口径一致 | ||
| session.recordTtfb(); | ||
| logger.debug( | ||
| "[ResponseHandler] Gemini passthrough: First byte timeout cleared on response received", | ||
| { | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| } | ||
| ); | ||
| } | ||
| // 注意:不要在“仅收到响应头”时清除首字节超时。 | ||
| // 背景:部分上游可能会快速返回 200 + SSE headers,但随后长时间不发送任何 body 数据。 | ||
| // 若在 headers 阶段就 clearResponseTimeout,会导致首字节超时失效,客户端与服务端都会表现为一直“请求中”。 | ||
| // 透传场景下,我们在后台 stats 读取到第一块数据时再清除超时(与非透传路径口径一致)。 | ||
|
|
||
| const responseForStats = response.clone(); | ||
| const statusCode = response.status; | ||
|
|
||
| const taskId = `stream-passthrough-${messageContext.id}`; | ||
| const statsPromise = (async () => { | ||
| const sessionWithCleanup = session as typeof session & { | ||
| clearResponseTimeout?: () => void; | ||
| }; | ||
| const sessionWithController = session as typeof session & { | ||
| responseController?: AbortController; | ||
| }; | ||
|
|
||
| let reader: ReadableStreamDefaultReader<Uint8Array> | null = null; | ||
| // 保护:避免透传 stats 任务把超大响应体无界缓存在内存中(DoS/OOM 风险) | ||
| // 说明:这里用于统计/结算的内容仅保留“尾部窗口”(最近 MAX_STATS_BUFFER_BYTES),用于尽可能解析 usage/假200。 | ||
| // 若响应体极大,仍会完整 drain 上游(reader.read),但不再累计完整字符串。 | ||
| const MAX_STATS_BUFFER_BYTES = 10 * 1024 * 1024; // 10MB | ||
| const MAX_STATS_BUFFER_CHUNKS = 8192; | ||
| const chunks: string[] = []; | ||
| const chunkBytes: number[] = []; | ||
| let chunkHead = 0; | ||
| let bufferedBytes = 0; | ||
| let wasTruncated = false; | ||
|
|
||
| const joinChunks = (): string => { | ||
| if (chunkHead <= 0) return chunks.join(""); | ||
| return chunks.slice(chunkHead).join(""); | ||
| }; | ||
|
|
||
| const pushChunk = (text: string, bytes: number) => { | ||
| if (!text) return; | ||
| chunks.push(text); | ||
| chunkBytes.push(bytes); | ||
| bufferedBytes += bytes; | ||
|
|
||
| // 仅保留尾部窗口,避免内存无界增长 | ||
| while (bufferedBytes > MAX_STATS_BUFFER_BYTES && chunkHead < chunkBytes.length) { | ||
| bufferedBytes -= chunkBytes[chunkHead] ?? 0; | ||
| chunks[chunkHead] = ""; | ||
| chunkBytes[chunkHead] = 0; | ||
| chunkHead += 1; | ||
| wasTruncated = true; | ||
| } | ||
|
|
||
| // 定期压缩数组,避免 head 指针过大导致 slice/join 性能退化 | ||
| if (chunkHead > 4096) { | ||
| chunks.splice(0, chunkHead); | ||
| chunkBytes.splice(0, chunkHead); | ||
| chunkHead = 0; | ||
| } | ||
|
|
||
| // 防御:限制 chunk 数量,避免大量超小 chunk 导致对象/数组膨胀(即使总字节数已受限) | ||
| const keptCount = chunks.length - chunkHead; | ||
| if (keptCount > MAX_STATS_BUFFER_CHUNKS) { | ||
| const joined = joinChunks(); | ||
| chunks.length = 0; | ||
| chunkBytes.length = 0; | ||
| chunkHead = 0; | ||
| chunks.push(joined); | ||
| chunkBytes.push(bufferedBytes); | ||
| } | ||
| }; | ||
| const decoder = new TextDecoder(); | ||
| let isFirstChunk = true; | ||
| let streamEndedNormally = false; | ||
| let responseTimeoutCleared = false; | ||
| let abortReason: string | undefined; | ||
|
|
||
| // 静默期 Watchdog:透传也需要支持中途卡住(无新数据推送) | ||
| const idleTimeoutMs = | ||
| provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; | ||
| let idleTimeoutId: NodeJS.Timeout | null = null; | ||
| const clearIdleTimer = () => { | ||
| if (idleTimeoutId) { | ||
| clearTimeout(idleTimeoutId); | ||
| idleTimeoutId = null; | ||
| } | ||
| }; | ||
| const startIdleTimer = () => { | ||
| if (idleTimeoutMs === Infinity) return; | ||
| clearIdleTimer(); | ||
| idleTimeoutId = setTimeout(() => { | ||
| abortReason = "STREAM_IDLE_TIMEOUT"; | ||
| logger.warn("[ResponseHandler] Gemini passthrough streaming idle timeout triggered", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| idleTimeoutMs, | ||
| chunksCollected: Math.max(0, chunks.length - chunkHead), | ||
| bufferedBytes, | ||
| wasTruncated, | ||
| }); | ||
| // 终止上游连接:让透传到客户端的连接也尽快结束,避免永久悬挂占用资源 | ||
| try { | ||
| sessionWithController.responseController?.abort(new Error("streaming_idle")); | ||
| } catch { | ||
| // ignore | ||
| } | ||
| }, idleTimeoutMs); | ||
| }; | ||
|
|
||
| const clearResponseTimeoutOnce = (firstChunkSize?: number) => { | ||
| if (responseTimeoutCleared) return; | ||
| if (!sessionWithCleanup.clearResponseTimeout) return; | ||
| sessionWithCleanup.clearResponseTimeout(); | ||
| responseTimeoutCleared = true; | ||
| if (firstChunkSize != null) { | ||
| logger.debug( | ||
| "[ResponseHandler] Gemini passthrough: First chunk received, response timeout cleared", | ||
| { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| firstChunkSize, | ||
| } | ||
| ); | ||
| } | ||
| }; | ||
|
|
||
| const flushAndJoin = (): string => { | ||
| const flushed = decoder.decode(); | ||
| if (flushed) pushChunk(flushed, 0); | ||
| return joinChunks(); | ||
| }; | ||
|
|
||
| try { | ||
| const reader = responseForStats.body?.getReader(); | ||
| if (!reader) return; | ||
| const body = responseForStats.body; | ||
| if (!body) return; | ||
| reader = body.getReader(); | ||
|
|
||
| // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容: | ||
| // - 用于解析 usage/cost 与内部结算(例如“假 200”检测) | ||
| // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。 | ||
| const chunks: string[] = []; | ||
| const decoder = new TextDecoder(); | ||
| let isFirstChunk = true; | ||
| let streamEndedNormally = false; | ||
|
|
||
| while (true) { | ||
| if (session.clientAbortSignal?.aborted) break; | ||
|
|
||
| const { done, value } = await reader.read(); | ||
| if (done) { | ||
| streamEndedNormally = true; | ||
| const wasResponseControllerAborted = | ||
| sessionWithController.responseController?.signal.aborted ?? false; | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
|
|
||
| // abort -> nodeStreamToWebStreamSafe 可能会把错误吞掉并 close(),导致 done=true; | ||
| // 这里必须结合 abort signal 判断是否为“自然结束”。 | ||
| if (wasResponseControllerAborted || clientAborted) { | ||
| streamEndedNormally = false; | ||
| if (!abortReason) { | ||
| abortReason = clientAborted ? "CLIENT_ABORTED" : "STREAM_RESPONSE_TIMEOUT"; | ||
| } | ||
| } else { | ||
| streamEndedNormally = true; | ||
| } | ||
| break; | ||
| } | ||
| if (value) { | ||
|
|
||
| const chunkSize = value?.byteLength ?? 0; | ||
| if (value && chunkSize > 0) { | ||
| if (isFirstChunk) { | ||
| isFirstChunk = false; | ||
| session.recordTtfb(); | ||
| clearResponseTimeoutOnce(chunkSize); | ||
| } | ||
| chunks.push(decoder.decode(value, { stream: true })); | ||
| pushChunk(decoder.decode(value, { stream: true }), chunkSize); | ||
| } | ||
|
Comment on lines
+1016
to
+1024
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Zero-byte chunks won't arm idle watchdog. When Consider moving Prompt To Fix With AIThis is a comment left during a code review.
Path: src/app/v1/_lib/proxy/response-handler.ts
Line: 1016:1024
Comment:
Zero-byte chunks won't arm idle watchdog.
When `value` exists but `chunkSize === 0`, the code skips `pushChunk()` and won't enter the `if (value && chunkSize > 0)` block. Subsequent `if (!isFirstChunk)` will be false (since `isFirstChunk` is still true), so `startIdleTimer()` never runs. A stream yielding only zero-length chunks will bypass idle timeout entirely.
Consider moving `startIdleTimer()` outside the `chunkSize > 0` check, or treating zero-byte reads as activity.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| // 首块数据到达后才启动 idle timer(避免与首字节超时职责重叠) | ||
| if (!isFirstChunk) { | ||
| startIdleTimer(); | ||
| } | ||
| } | ||
|
|
||
| const flushed = decoder.decode(); | ||
| if (flushed) chunks.push(flushed); | ||
| const allContent = chunks.join(""); | ||
| clearIdleTimer(); | ||
| const allContent = flushAndJoin(); | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
|
|
||
| // 存储响应体到 Redis(5分钟过期) | ||
| if (session.sessionId) { | ||
| if (session.sessionId && !wasTruncated) { | ||
| void SessionManager.storeSessionResponse( | ||
| session.sessionId, | ||
| allContent, | ||
| session.requestSequence | ||
| ).catch((err) => { | ||
| logger.error("[ResponseHandler] Failed to store stream passthrough response:", err); | ||
| }); | ||
| } else if (session.sessionId && wasTruncated) { | ||
| logger.warn("[ResponseHandler] Skip storing passthrough response: body too large", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| maxBytes: MAX_STATS_BUFFER_BYTES, | ||
| }); | ||
| } | ||
|
|
||
| // 使用共享的统计处理方法 | ||
|
|
@@ -927,7 +1058,8 @@ export class ProxyResponseHandler { | |
| allContent, | ||
| statusCode, | ||
| streamEndedNormally, | ||
| clientAborted | ||
| clientAborted, | ||
| abortReason | ||
| ); | ||
| await finalizeRequestStats( | ||
| session, | ||
|
|
@@ -938,10 +1070,129 @@ export class ProxyResponseHandler { | |
| finalized.providerIdForPersistence ?? undefined | ||
| ); | ||
| } catch (error) { | ||
| if (!isClientAbortError(error as Error)) { | ||
| logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error); | ||
| const err = error instanceof Error ? error : new Error(String(error)); | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
| const isResponseControllerAborted = | ||
| sessionWithController.responseController?.signal.aborted ?? false; | ||
| const isIdleTimeout = !!err.message?.includes("streaming_idle"); | ||
|
|
||
| abortReason = | ||
| abortReason ?? | ||
| (clientAborted | ||
| ? "CLIENT_ABORTED" | ||
| : isIdleTimeout | ||
| ? "STREAM_IDLE_TIMEOUT" | ||
| : isResponseControllerAborted | ||
| ? "STREAM_RESPONSE_TIMEOUT" | ||
| : "STREAM_PROCESSING_ERROR"); | ||
|
|
||
| // 透传的 stats 任务失败时,必须尽量落库并结束追踪,避免请求长期停留在“requesting” | ||
| logger.error("[ResponseHandler] Gemini passthrough stats task failed", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| messageId: messageContext.id, | ||
| clientAborted, | ||
| isResponseControllerAborted, | ||
| isIdleTimeout, | ||
| abortReason, | ||
| errorName: err.name, | ||
| errorMessage: err.message || "(empty message)", | ||
| }); | ||
|
|
||
| try { | ||
| clearIdleTimer(); | ||
| const allContent = flushAndJoin(); | ||
| const duration = Date.now() - session.startTime; | ||
|
|
||
| const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( | ||
| session, | ||
| allContent, | ||
| statusCode, | ||
| false, | ||
| clientAborted, | ||
| abortReason | ||
| ); | ||
|
|
||
| await finalizeRequestStats( | ||
| session, | ||
| allContent, | ||
| finalized.effectiveStatusCode, | ||
| duration, | ||
| finalized.errorMessage ?? abortReason, | ||
| finalized.providerIdForPersistence ?? undefined | ||
| ); | ||
| } catch (finalizeError) { | ||
| await persistRequestFailure({ | ||
| session, | ||
| messageContext, | ||
| statusCode: statusCode && statusCode >= 400 ? statusCode : 502, | ||
| error: finalizeError, | ||
| taskId, | ||
| phase: "stream", | ||
| }); | ||
| } | ||
| } finally { | ||
| clearIdleTimer(); | ||
| // 兜底:在流结束/中断后清理首字节超时,避免定时器泄漏 | ||
| // 注意:不应在流仍可能继续时清理(否则会让首字节超时失效) | ||
| try { | ||
| const wasResponseControllerAborted = | ||
| sessionWithController.responseController?.signal.aborted ?? false; | ||
| const clientAborted = session.clientAbortSignal?.aborted ?? false; | ||
| const shouldClearTimeout = | ||
| responseTimeoutCleared || | ||
| streamEndedNormally || | ||
| wasResponseControllerAborted || | ||
| clientAborted; | ||
| if (shouldClearTimeout) { | ||
| clearResponseTimeoutOnce(); | ||
| } | ||
| } catch (e) { | ||
| logger.warn( | ||
| "[ResponseHandler] Gemini passthrough: Failed to clear response timeout", | ||
| { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| error: e instanceof Error ? e.message : String(e), | ||
| } | ||
| ); | ||
| } | ||
| try { | ||
| // 取消 tee 分支,避免 stats 任务提前退出时 backpressure 影响客户端透传 | ||
| const cancelPromise = reader?.cancel(); | ||
| if (cancelPromise) { | ||
| cancelPromise.catch((err) => { | ||
| logger.warn( | ||
| "[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", | ||
| { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| error: err instanceof Error ? err.message : String(err), | ||
| } | ||
| ); | ||
| }); | ||
| } | ||
| } catch (e) { | ||
| logger.warn("[ResponseHandler] Gemini passthrough: Failed to cancel stats reader", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| error: e instanceof Error ? e.message : String(e), | ||
| }); | ||
| } | ||
| try { | ||
| reader?.releaseLock(); | ||
| } catch (e) { | ||
| logger.warn("[ResponseHandler] Gemini passthrough: Failed to release reader lock", { | ||
| taskId, | ||
| providerId: provider.id, | ||
| providerName: provider.name, | ||
| error: e instanceof Error ? e.message : String(e), | ||
| }); | ||
| } | ||
| AsyncTaskManager.cleanup(taskId); | ||
| } | ||
| })(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tail-window chunking can lose data for usage parsing.
When response exceeds
MAX_STATS_BUFFER_BYTES, only the trailing 10MB is kept. If usage/cost metadata appears early in the stream (e.g., first SSE event), it will be discarded and stats finalization may fail to extract billing info.Check if upstream Gemini responses front-load metadata, or document that stats are best-effort for >10MB responses.
Prompt To Fix With AI