From a11b413719eb721c3e8b05f60410d38233ca796e Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 09:48:17 +0800 Subject: [PATCH 01/13] =?UTF-8?q?fix(proxy):=20SSE=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E5=90=8E=E8=AF=86=E5=88=AB=E5=81=87200=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/forwarder.ts | 30 +++ src/app/v1/_lib/proxy/response-handler.ts | 232 ++++++++++++++++-- src/app/v1/_lib/proxy/stream-finalization.ts | 45 ++++ .../utils/upstream-error-detection.test.ts | 53 ++++ src/lib/utils/upstream-error-detection.ts | 144 +++++++++++ 5 files changed, 486 insertions(+), 18 deletions(-) create mode 100644 src/app/v1/_lib/proxy/stream-finalization.ts create mode 100644 src/lib/utils/upstream-error-detection.test.ts create mode 100644 src/lib/utils/upstream-error-detection.ts diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 846dc7e0b..45a9d8f87 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -49,6 +49,7 @@ import { import { ModelRedirector } from "./model-redirector"; import { ProxyProviderResolver } from "./provider-selector"; import type { ProxySession } from "./session"; +import { setDeferredStreamingFinalization } from "./stream-finalization"; import { detectThinkingBudgetRectifierTrigger, rectifyThinkingBudget, @@ -377,6 +378,35 @@ export class ProxyForwarder { const contentType = response.headers.get("content-type") || ""; const isSSE = contentType.includes("text/event-stream"); + // ========== 流式响应:延迟成功判定(避免“假 200”)========== + // 背景:上游可能返回 HTTP 200,但 SSE 内容为错误 JSON(如 {"error": "..."}) + // 这种情况下如果立即记录 success / 绑定 session,会导致后续复用同一 provider,影响自动重试与故障转移策略 + // 解决:把成功/失败的最终结算交给 ResponseHandler,在 SSE 完整结束后再决定 + if (isSSE) { + setDeferredStreamingFinalization(session, { + providerId: currentProvider.id, + providerName: currentProvider.name, + providerPriority: currentProvider.priority || 0, + attemptNumber: attemptCount, + totalProvidersAttempted, + isFirstAttempt: totalProvidersAttempted === 1 && attemptCount === 1, + isFailoverSuccess: totalProvidersAttempted > 1, + endpointId: activeEndpoint.endpointId, + endpointUrl: endpointAudit.endpointUrl, + upstreamStatusCode: response.status, + }); + + logger.info("ProxyForwarder: Streaming response received, deferring finalization", { + providerId: currentProvider.id, + providerName: currentProvider.name, + attemptNumber: attemptCount, + totalProvidersAttempted, + statusCode: response.status, + }); + + return response; + } + if (!isSSE) { // 非流式响应:检测空响应 const contentLength = response.headers.get("content-length"); diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 473611b3b..3b199841e 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -11,6 +11,7 @@ import { SessionTracker } from "@/lib/session-tracker"; import { calculateRequestCost } from "@/lib/utils/cost-calculation"; import { hasValidPriceData } from "@/lib/utils/price-data"; import { parseSSEData } from "@/lib/utils/sse"; +import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; import { updateMessageRequestCost, updateMessageRequestDetails, @@ -23,6 +24,7 @@ import { GeminiAdapter } from "../gemini/adapter"; import type { GeminiResponse } from "../gemini/types"; import { isClientAbortError } from "./errors"; import type { ProxySession } from "./session"; +import { consumeDeferredStreamingFinalization } from "./stream-finalization"; export type UsageMetrics = { input_tokens?: number; @@ -59,6 +61,159 @@ function cleanResponseHeaders(headers: Headers): Headers { return cleaned; } +type FinalizeDeferredStreamingResult = { + effectiveStatusCode: number; + errorMessage: string | null; +}; + +async function finalizeDeferredStreamingFinalizationIfNeeded( + session: ProxySession, + allContent: string, + upstreamStatusCode: number, + streamEndedNormally: boolean +): Promise { + const meta = consumeDeferredStreamingFinalization(session); + const provider = session.provider; + + const detected = streamEndedNormally + ? detectUpstreamErrorFromSseOrJsonText(allContent) + : ({ isError: false } as const); + + const effectiveStatusCode = detected.isError ? 502 : upstreamStatusCode; + const errorMessage = detected.isError ? detected.reason : null; + + // 未启用延迟结算 / 未正常结束:只返回判定结果,不做熔断/绑定更新 + if (!meta || !provider || !streamEndedNormally) { + return { effectiveStatusCode, errorMessage }; + } + + if (provider.id !== meta.providerId) { + logger.warn("[ResponseHandler] Deferred streaming meta provider mismatch", { + sessionId: session.sessionId ?? null, + metaProviderId: meta.providerId, + currentProviderId: provider.id, + }); + } + + if (detected.isError) { + logger.warn("[ResponseHandler] SSE completed but body indicates error (fake 200)", { + providerId: meta.providerId, + providerName: meta.providerName, + upstreamStatusCode: meta.upstreamStatusCode, + effectiveStatusCode, + reason: detected.reason, + }); + + // 计入熔断器:让后续请求能正确触发故障转移/熔断 + try { + const { recordFailure } = await import("@/lib/circuit-breaker"); + await recordFailure(meta.providerId, new Error(detected.reason)); + } catch (cbError) { + logger.warn("[ResponseHandler] Failed to record fake-200 error in circuit breaker", { + providerId: meta.providerId, + error: cbError, + }); + } + + // 记录到决策链(用于日志展示) + session.addProviderToChain(provider, { + endpointId: meta.endpointId, + endpointUrl: meta.endpointUrl, + reason: "retry_failed", + attemptNumber: meta.attemptNumber, + statusCode: effectiveStatusCode, + errorMessage: detected.reason, + }); + + return { effectiveStatusCode, errorMessage }; + } + + // ========== 真正成功(SSE 完整结束且未命中错误判定)========== + if (meta.endpointId != null) { + try { + const { recordEndpointSuccess } = await import("@/lib/endpoint-circuit-breaker"); + await recordEndpointSuccess(meta.endpointId); + } catch (endpointError) { + logger.warn("[ResponseHandler] Failed to record endpoint success (stream)", { + endpointId: meta.endpointId, + providerId: meta.providerId, + error: endpointError, + }); + } + } + + try { + const { recordSuccess } = await import("@/lib/circuit-breaker"); + await recordSuccess(meta.providerId); + } catch (cbError) { + logger.warn("[ResponseHandler] Failed to record streaming success in circuit breaker", { + providerId: meta.providerId, + error: cbError, + }); + } + + // ⭐ 成功后绑定 session 到供应商(智能绑定策略) + if (session.sessionId) { + const result = await SessionManager.updateSessionBindingSmart( + session.sessionId, + meta.providerId, + meta.providerPriority, + meta.isFirstAttempt, + meta.isFailoverSuccess + ); + + if (result.updated) { + logger.info("[ResponseHandler] Session binding updated (stream finalized)", { + sessionId: session.sessionId, + providerId: meta.providerId, + providerName: meta.providerName, + priority: meta.providerPriority, + reason: result.reason, + details: result.details, + attemptNumber: meta.attemptNumber, + totalProvidersAttempted: meta.totalProvidersAttempted, + }); + } else { + logger.debug("[ResponseHandler] Session binding not updated (stream finalized)", { + sessionId: session.sessionId, + providerId: meta.providerId, + providerName: meta.providerName, + priority: meta.providerPriority, + reason: result.reason, + details: result.details, + }); + } + + // ⭐ 统一更新两个数据源(确保监控数据一致) + void SessionManager.updateSessionProvider(session.sessionId, { + providerId: meta.providerId, + providerName: meta.providerName, + }).catch((err) => { + logger.error("[ResponseHandler] Failed to update session provider info (stream)", { + error: err, + }); + }); + } + + session.addProviderToChain(provider, { + endpointId: meta.endpointId, + endpointUrl: meta.endpointUrl, + reason: meta.isFirstAttempt ? "request_success" : "retry_success", + attemptNumber: meta.attemptNumber, + statusCode: meta.upstreamStatusCode, + }); + + logger.info("[ResponseHandler] Streaming request finalized as success", { + providerId: meta.providerId, + providerName: meta.providerName, + attemptNumber: meta.attemptNumber, + totalProvidersAttempted: meta.totalProvidersAttempted, + statusCode: meta.upstreamStatusCode, + }); + + return { effectiveStatusCode, errorMessage }; +} + export class ProxyResponseHandler { static async dispatch(session: ProxySession, response: Response): Promise { let fixedResponse = response; @@ -576,12 +731,16 @@ export class ProxyResponseHandler { const chunks: string[] = []; const decoder = new TextDecoder(); let isFirstChunk = true; + let streamEndedNormally = false; while (true) { if (session.clientAbortSignal?.aborted) break; const { done, value } = await reader.read(); - if (done) break; + if (done) { + streamEndedNormally = true; + break; + } if (value) { if (isFirstChunk) { isFirstChunk = false; @@ -608,7 +767,19 @@ export class ProxyResponseHandler { // 使用共享的统计处理方法 const duration = Date.now() - session.startTime; - await finalizeRequestStats(session, allContent, statusCode, duration); + const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( + session, + allContent, + statusCode, + streamEndedNormally + ); + await finalizeRequestStats( + session, + allContent, + finalized.effectiveStatusCode, + duration, + finalized.errorMessage ?? undefined + ); } catch (error) { if (!isClientAbortError(error as Error)) { logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error); @@ -779,7 +950,19 @@ export class ProxyResponseHandler { return chunks.join(""); }; - const finalizeStream = async (allContent: string): Promise => { + const finalizeStream = async ( + allContent: string, + streamEndedNormally: boolean + ): Promise => { + const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( + session, + allContent, + statusCode, + streamEndedNormally + ); + const effectiveStatusCode = finalized.effectiveStatusCode; + const streamErrorMessage = finalized.errorMessage; + // 存储响应体到 Redis(5分钟过期) if (session.sessionId) { void SessionManager.storeSessionResponse( @@ -839,10 +1022,10 @@ export class ProxyResponseHandler { await trackCostToRedis(session, usageForCost); // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId && usageForCost) { + if (session.sessionId) { let costUsdStr: string | undefined; try { - if (session.request.model) { + if (usageForCost && session.request.model) { const priceData = await session.getCachedPriceDataByBillingSource(); if (priceData) { const cost = calculateRequestCost( @@ -862,22 +1045,28 @@ export class ProxyResponseHandler { }); } - void SessionManager.updateSessionUsage(session.sessionId, { - inputTokens: usageForCost.input_tokens, - outputTokens: usageForCost.output_tokens, - cacheCreationInputTokens: usageForCost.cache_creation_input_tokens, - cacheReadInputTokens: usageForCost.cache_read_input_tokens, - costUsd: costUsdStr, - status: statusCode >= 200 && statusCode < 300 ? "completed" : "error", - statusCode: statusCode, - }).catch((error: unknown) => { + const payload: SessionUsageUpdate = { + status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error", + statusCode: effectiveStatusCode, + ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}), + }; + + if (usageForCost) { + payload.inputTokens = usageForCost.input_tokens; + payload.outputTokens = usageForCost.output_tokens; + payload.cacheCreationInputTokens = usageForCost.cache_creation_input_tokens; + payload.cacheReadInputTokens = usageForCost.cache_read_input_tokens; + payload.costUsd = costUsdStr; + } + + void SessionManager.updateSessionUsage(session.sessionId, payload).catch((error: unknown) => { logger.error("[ResponseHandler] Failed to update session usage:", error); }); } // 保存扩展信息(status code, tokens, provider chain) await updateMessageRequestDetails(messageContext.id, { - statusCode: statusCode, + statusCode: effectiveStatusCode, inputTokens: usageForCost?.input_tokens, outputTokens: usageForCost?.output_tokens, ttfbMs: session.ttfbMs, @@ -887,6 +1076,7 @@ export class ProxyResponseHandler { cacheCreation1hInputTokens: usageForCost?.cache_creation_1h_input_tokens, cacheTtlApplied: usageForCost?.cache_ttl ?? null, providerChain: session.getProviderChain(), + ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}), model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型 providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后) context1mApplied: session.getContext1mApplied(), @@ -894,6 +1084,7 @@ export class ProxyResponseHandler { }; try { + let streamEndedNormally = false; while (true) { // 检查取消信号 if (session.clientAbortSignal?.aborted || abortController.signal.aborted) { @@ -907,6 +1098,7 @@ export class ProxyResponseHandler { const { value, done } = await reader.read(); if (done) { + streamEndedNormally = true; break; } if (value) { @@ -945,7 +1137,7 @@ export class ProxyResponseHandler { // ⭐ 流式读取完成:清除静默期计时器 clearIdleTimer(); const allContent = flushAndJoin(); - await finalizeStream(allContent); + await finalizeStream(allContent, streamEndedNormally); } catch (error) { // 检测 AbortError 的来源:响应超时 vs 静默期超时 vs 客户端/上游中断 const err = error as Error; @@ -1070,7 +1262,7 @@ export class ProxyResponseHandler { }); try { const allContent = flushAndJoin(); - await finalizeStream(allContent); + await finalizeStream(allContent, false); } catch (finalizeError) { logger.error("ResponseHandler: Failed to finalize aborted stream response", { taskId, @@ -1800,7 +1992,8 @@ export async function finalizeRequestStats( session: ProxySession, responseText: string, statusCode: number, - duration: number + duration: number, + errorMessage?: string ): Promise { const { messageContext, provider } = session; if (!provider || !messageContext) { @@ -1820,6 +2013,7 @@ export async function finalizeRequestStats( // 即使没有 usageMetrics,也需要更新状态码和 provider chain await updateMessageRequestDetails(messageContext.id, { statusCode: statusCode, + ...(errorMessage ? { errorMessage } : {}), ttfbMs: session.ttfbMs ?? duration, providerChain: session.getProviderChain(), model: session.getCurrentModel() ?? undefined, @@ -1892,6 +2086,7 @@ export async function finalizeRequestStats( costUsd: costUsdStr, status: statusCode >= 200 && statusCode < 300 ? "completed" : "error", statusCode: statusCode, + ...(errorMessage ? { errorMessage } : {}), }).catch((error: unknown) => { logger.error("[ResponseHandler] Failed to update session usage:", error); }); @@ -1909,6 +2104,7 @@ export async function finalizeRequestStats( cacheCreation1hInputTokens: normalizedUsage.cache_creation_1h_input_tokens, cacheTtlApplied: normalizedUsage.cache_ttl ?? null, providerChain: session.getProviderChain(), + ...(errorMessage ? { errorMessage } : {}), model: session.getCurrentModel() ?? undefined, providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后) context1mApplied: session.getContext1mApplied(), diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts new file mode 100644 index 000000000..82b11b0a4 --- /dev/null +++ b/src/app/v1/_lib/proxy/stream-finalization.ts @@ -0,0 +1,45 @@ +import type { ProxySession } from "./session"; + +/** + * 流式响应(SSE)在“收到响应头”时无法确定成功与否: + * - 上游可能返回 HTTP 200,但 body 是错误 JSON(假 200) + * - 只有在 SSE 结束后才能做最终判定 + * + * 该结构用于 Forwarder → ResponseHandler 之间传递“延迟结算”的必要信息, + * 以便在流结束后更新熔断器、endpoint 成功率、以及 session 绑定。 + */ +export type DeferredStreamingFinalization = { + providerId: number; + providerName: string; + providerPriority: number; + attemptNumber: number; + totalProvidersAttempted: number; + isFirstAttempt: boolean; + isFailoverSuccess: boolean; + endpointId: number | null; + endpointUrl: string; + upstreamStatusCode: number; +}; + +type SessionWithDeferred = ProxySession & { + deferredStreamingFinalization?: DeferredStreamingFinalization; +}; + +export function setDeferredStreamingFinalization( + session: ProxySession, + meta: DeferredStreamingFinalization +): void { + (session as SessionWithDeferred).deferredStreamingFinalization = meta; +} + +export function consumeDeferredStreamingFinalization( + session: ProxySession +): DeferredStreamingFinalization | null { + const s = session as SessionWithDeferred; + const meta = s.deferredStreamingFinalization ?? null; + if (meta) { + s.deferredStreamingFinalization = undefined; + } + return meta; +} + diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts new file mode 100644 index 000000000..33092abc1 --- /dev/null +++ b/src/lib/utils/upstream-error-detection.test.ts @@ -0,0 +1,53 @@ +import { describe, expect, test } from "vitest"; +import { detectUpstreamErrorFromSseOrJsonText } from "./upstream-error-detection"; + +describe("detectUpstreamErrorFromSseOrJsonText", () => { + test("空响应体视为错误", () => { + expect(detectUpstreamErrorFromSseOrJsonText("")).toEqual({ + isError: true, + reason: "上游返回 200 但响应体为空", + }); + }); + + test("纯 JSON:error 字段非空视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"error":"当前无可用凭证"}'); + expect(res.isError).toBe(true); + }); + + test("纯 JSON:error 为空字符串不视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"error":""}'); + expect(res.isError).toBe(false); + }); + + test("纯 JSON:小于 1000 字符且 message 包含 error 字样视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"message":"some error happened"}'); + expect(res.isError).toBe(true); + }); + + test("纯 JSON:大于等于 1000 字符时不做 message 关键字判定", () => { + const longMessage = "a".repeat(1000); + const res = detectUpstreamErrorFromSseOrJsonText( + JSON.stringify({ message: `${longMessage} error ${longMessage}` }) + ); + expect(res.isError).toBe(false); + }); + + test("SSE:data JSON 包含非空 error 字段视为错误", () => { + const sse = ['event: message', 'data: {"error":"当前无可用凭证"}', ""].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(true); + }); + + test("SSE:data JSON 小于 1000 字符且 message 包含 error 字样视为错误", () => { + const sse = ['data: {"message":"ERROR: no credentials"}', ""].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(true); + }); + + test("SSE:仅有 [DONE] 不视为错误", () => { + const sse = ["data: [DONE]", ""].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(false); + }); +}); + diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts new file mode 100644 index 000000000..8c6de4104 --- /dev/null +++ b/src/lib/utils/upstream-error-detection.ts @@ -0,0 +1,144 @@ +import { parseSSEData } from "@/lib/utils/sse"; + +export type UpstreamErrorDetectionResult = + | { isError: false } + | { + isError: true; + reason: string; + }; + +type DetectionOptions = { + /** + * 仅对小体积 JSON 启用 message 关键字检测,避免误判与无谓开销 + */ + maxJsonCharsForMessageCheck?: number; + /** + * message 关键字匹配规则(默认 /error/i) + */ + messageKeyword?: RegExp; +}; + +const DEFAULT_MAX_JSON_CHARS_FOR_MESSAGE_CHECK = 1000; +const DEFAULT_MESSAGE_KEYWORD = /error/i; + +function isPlainRecord(value: unknown): value is Record { + return !!value && typeof value === "object" && !Array.isArray(value); +} + +function hasNonEmptyValue(value: unknown): boolean { + if (value === null || value === undefined) return false; + if (typeof value === "string") return value.trim().length > 0; + if (typeof value === "number") return !Number.isNaN(value) && value !== 0; + if (typeof value === "boolean") return value; + if (Array.isArray(value)) return value.length > 0; + if (typeof value === "object") return Object.keys(value as Record).length > 0; + return true; +} + +function truncateForReason(text: string, maxLen: number = 200): string { + const trimmed = text.trim(); + if (trimmed.length <= maxLen) return trimmed; + return `${trimmed.slice(0, maxLen)}…`; +} + +function detectFromJsonObject( + obj: Record, + rawJsonChars: number, + options: Required> +): UpstreamErrorDetectionResult { + const errorValue = obj.error; + if (hasNonEmptyValue(errorValue)) { + // 优先展示 string 或 error.message,避免把整个对象塞进 reason + if (typeof errorValue === "string") { + return { + isError: true, + reason: `上游返回 200 但 JSON.error 非空: ${truncateForReason(errorValue)}`, + }; + } + + if (isPlainRecord(errorValue) && typeof errorValue.message === "string") { + return { + isError: true, + reason: `上游返回 200 但 JSON.error.message 非空: ${truncateForReason(errorValue.message)}`, + }; + } + + return { isError: true, reason: "上游返回 200 但 JSON.error 非空" }; + } + + if (rawJsonChars < options.maxJsonCharsForMessageCheck) { + const message = + typeof obj.message === "string" + ? obj.message + : isPlainRecord(obj.error) && typeof obj.error.message === "string" + ? obj.error.message + : null; + + if (message && options.messageKeyword.test(message)) { + return { + isError: true, + reason: `上游返回 200 但 JSON.message 命中关键字: ${truncateForReason(message)}`, + }; + } + } + + return { isError: false }; +} + +/** + * 用于“流式 SSE 已经结束后”的补充检查: + * - 响应体为空:视为错误 + * - JSON 里包含非空 error 字段:视为错误 + * - 小于 1000 字符的 JSON:若 message(或 error.message)包含 "error" 字样:视为错误 + */ +export function detectUpstreamErrorFromSseOrJsonText( + text: string, + options: DetectionOptions = {} +): UpstreamErrorDetectionResult { + const merged: Required> = { + maxJsonCharsForMessageCheck: + options.maxJsonCharsForMessageCheck ?? DEFAULT_MAX_JSON_CHARS_FOR_MESSAGE_CHECK, + messageKeyword: options.messageKeyword ?? DEFAULT_MESSAGE_KEYWORD, + }; + + const trimmed = text.trim(); + if (!trimmed) { + return { isError: true, reason: "上游返回 200 但响应体为空" }; + } + + // 情况 1:纯 JSON(上游可能 Content-Type 设置为 SSE,但实际上返回 JSON) + if (trimmed.startsWith("{") || trimmed.startsWith("[")) { + try { + const parsed = JSON.parse(trimmed) as unknown; + if (isPlainRecord(parsed)) { + return detectFromJsonObject(parsed, trimmed.length, merged); + } + } catch { + // JSON 解析失败:不视为错误,交由上层逻辑处理 + } + return { isError: false }; + } + + // 情况 2:SSE 文本。快速过滤:既无 "error" 也无 "message" key 时跳过解析 + // 注意:这里用 key 形式的引号匹配,尽量避免 assistant 正文里出现 "error" 造成的无谓解析 + if (!text.includes("\"error\"") && !text.includes("\"message\"")) { + return { isError: false }; + } + + const events = parseSSEData(text); + for (const evt of events) { + if (!isPlainRecord(evt.data)) continue; + let chars = 0; + try { + chars = JSON.stringify(evt.data).length; + } catch { + // ignore + } + + const res = detectFromJsonObject(evt.data, chars, merged); + if (res.isError) return res; + } + + return { isError: false }; +} + From f5ce3122205e2fd9578cc73da11e9a5247567763 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 10:09:58 +0800 Subject: [PATCH 02/13] =?UTF-8?q?test(proxy):=20=E8=A1=A5=E5=85=85?= =?UTF-8?q?=E5=81=87200=E6=A3=80=E6=B5=8B=E6=B3=A8=E9=87=8A=E4=B8=8E?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/forwarder.ts | 11 +++-- src/app/v1/_lib/proxy/response-handler.ts | 37 +++++++++++++- src/app/v1/_lib/proxy/stream-finalization.ts | 12 +++-- .../utils/upstream-error-detection.test.ts | 40 ++++++++++++++- src/lib/utils/upstream-error-detection.ts | 49 +++++++++++++++++-- 5 files changed, 136 insertions(+), 13 deletions(-) diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 45a9d8f87..652ee6b7f 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -379,9 +379,14 @@ export class ProxyForwarder { const isSSE = contentType.includes("text/event-stream"); // ========== 流式响应:延迟成功判定(避免“假 200”)========== - // 背景:上游可能返回 HTTP 200,但 SSE 内容为错误 JSON(如 {"error": "..."}) - // 这种情况下如果立即记录 success / 绑定 session,会导致后续复用同一 provider,影响自动重试与故障转移策略 - // 解决:把成功/失败的最终结算交给 ResponseHandler,在 SSE 完整结束后再决定 + // 背景:上游可能返回 HTTP 200,但 SSE 内容为错误 JSON(如 {"error": "..."})。 + // 如果在“收到响应头”时就立刻记录 success / 更新 session 绑定: + // - 会把会话粘到一个实际不可用的 provider; + // - 熔断/故障转移统计被误记为成功; + // - 客户端下一次自动重试可能仍复用到同一 provider,导致“假 200”让重试失效。 + // + // 解决:Forwarder 只负责尽快把 Response 返回给下游开始透传, + // 把最终成功/失败结算延迟到 ResponseHandler:等 SSE 正常结束后再基于最终 body 补充检查并更新内部状态。 if (isSSE) { setDeferredStreamingFinalization(session, { providerId: currentProvider.id, diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 3b199841e..3d5d53d5c 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -62,10 +62,33 @@ function cleanResponseHeaders(headers: Headers): Headers { } type FinalizeDeferredStreamingResult = { + /** + * “内部结算用”的状态码。 + * + * 注意:这不会改变客户端实际收到的 HTTP 状态码(SSE 已经开始透传后无法回头改)。 + * 这里的目的仅是让内部统计/熔断/会话绑定把“假 200”按失败处理。 + */ effectiveStatusCode: number; + /** + * 内部记录的错误原因(用于写入 DB/监控,帮助定位“假 200”问题)。 + */ errorMessage: string | null; }; +/** + * 若本次 SSE 被标记为“延迟结算”,则在流结束后补齐成功/失败的最终判定。 + * + * 触发条件 + * - Forwarder 收到 Response 且识别为 SSE 时,会在 session 上挂载 DeferredStreamingFinalization 元信息。 + * - ResponseHandler 在后台读取完整 SSE 内容后,调用本函数: + * - 如果内容看起来是上游错误 JSON(假 200),则: + * - 计入熔断器失败; + * - 不更新 session 智能绑定(避免把会话粘到坏 provider); + * - 内部状态码改为 502(只影响统计与后续重试选择,不影响本次客户端响应)。 + * - 如果流正常结束且未命中错误判定,则按成功结算并更新绑定/熔断/endpoint 成功率。 + * + * @param streamEndedNormally - 必须是 reader 读到 done=true 的“自然结束”;超时/中断等异常结束由其它逻辑处理。 + */ async function finalizeDeferredStreamingFinalizationIfNeeded( session: ProxySession, allContent: string, @@ -79,10 +102,14 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( ? detectUpstreamErrorFromSseOrJsonText(allContent) : ({ isError: false } as const); + // “假 200”统一映射为 502:既能让内部状态落到 error 分支, + // 也能让后续熔断/故障转移策略按“上游错误”处理。 const effectiveStatusCode = detected.isError ? 502 : upstreamStatusCode; const errorMessage = detected.isError ? detected.reason : null; - // 未启用延迟结算 / 未正常结束:只返回判定结果,不做熔断/绑定更新 + // 未启用延迟结算 / provider 缺失 / 未自然结束: + // - 只返回“内部状态码 + 错误原因”,由调用方写入统计; + // - 不在这里更新熔断/绑定(异常结束场景通常已有专门逻辑做 recordFailure/persistRequestFailure)。 if (!meta || !provider || !streamEndedNormally) { return { effectiveStatusCode, errorMessage }; } @@ -106,6 +133,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( // 计入熔断器:让后续请求能正确触发故障转移/熔断 try { + // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。 const { recordFailure } = await import("@/lib/circuit-breaker"); await recordFailure(meta.providerId, new Error(detected.reason)); } catch (cbError) { @@ -115,7 +143,9 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( }); } - // 记录到决策链(用于日志展示) + // 记录到决策链(用于日志展示与 DB 持久化)。 + // 注意:这里用 effectiveStatusCode(502)而不是 upstreamStatusCode(200), + // 以便让内部链路明确显示这是一次失败(否则会被误读为成功)。 session.addProviderToChain(provider, { endpointId: meta.endpointId, endpointUrl: meta.endpointUrl, @@ -1987,6 +2017,9 @@ async function updateRequestCostFromUsage( /** * 统一的请求统计处理方法 * 用于消除 Gemini 透传、普通非流式、普通流式之间的重复统计逻辑 + * + * @param statusCode - 内部结算状态码(可能与客户端实际收到的 HTTP 状态不同,例如“假 200”会被映射为 502) + * @param errorMessage - 可选的内部错误原因(用于把假 200/解析失败等信息写入 DB 与监控) */ export async function finalizeRequestStats( session: ProxySession, diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts index 82b11b0a4..4a4766700 100644 --- a/src/app/v1/_lib/proxy/stream-finalization.ts +++ b/src/app/v1/_lib/proxy/stream-finalization.ts @@ -5,8 +5,13 @@ import type { ProxySession } from "./session"; * - 上游可能返回 HTTP 200,但 body 是错误 JSON(假 200) * - 只有在 SSE 结束后才能做最终判定 * - * 该结构用于 Forwarder → ResponseHandler 之间传递“延迟结算”的必要信息, - * 以便在流结束后更新熔断器、endpoint 成功率、以及 session 绑定。 + * 该结构用于 Forwarder → ResponseHandler 之间传递“延迟结算”的必要信息: + * - Forwarder:拿到 Response 后尽快开始向客户端透传(降低延迟);但不要立刻记为 success/绑定 session。 + * - ResponseHandler:在流正常结束后,基于最终响应体做一次补充检查,然后再更新熔断/endpoint/会话绑定。 + * + * 说明: + * - 这里选择“把元信息挂到 session 上”而不是改动大量类型/函数签名,避免改动面过大; + * - 元信息是一次性的:消费后会被清空,避免跨请求污染。 */ export type DeferredStreamingFinalization = { providerId: number; @@ -29,6 +34,7 @@ export function setDeferredStreamingFinalization( session: ProxySession, meta: DeferredStreamingFinalization ): void { + // Forwarder 在识别到 SSE 时调用:标记该请求需要在流结束后“二次结算”。 (session as SessionWithDeferred).deferredStreamingFinalization = meta; } @@ -38,8 +44,8 @@ export function consumeDeferredStreamingFinalization( const s = session as SessionWithDeferred; const meta = s.deferredStreamingFinalization ?? null; if (meta) { + // 只允许消费一次:避免重复结算(例如多个后台统计任务并行时)。 s.deferredStreamingFinalization = undefined; } return meta; } - diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts index 33092abc1..985583d72 100644 --- a/src/lib/utils/upstream-error-detection.test.ts +++ b/src/lib/utils/upstream-error-detection.test.ts @@ -14,11 +14,23 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { expect(res.isError).toBe(true); }); + test("纯 JSON:error 为对象且 error.message 非空视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText( + JSON.stringify({ error: { message: "error: no credentials" } }) + ); + expect(res.isError).toBe(true); + }); + test("纯 JSON:error 为空字符串不视为错误", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"error":""}'); expect(res.isError).toBe(false); }); + test("纯 JSON:message 不包含关键字不视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"message":"all good"}'); + expect(res.isError).toBe(false); + }); + test("纯 JSON:小于 1000 字符且 message 包含 error 字样视为错误", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"message":"some error happened"}'); expect(res.isError).toBe(true); @@ -32,22 +44,48 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { expect(res.isError).toBe(false); }); + test("纯 JSON:非法 JSON 不抛错且不视为错误", () => { + const res = detectUpstreamErrorFromSseOrJsonText("{not-json}"); + expect(res.isError).toBe(false); + }); + test("SSE:data JSON 包含非空 error 字段视为错误", () => { const sse = ['event: message', 'data: {"error":"当前无可用凭证"}', ""].join("\n"); const res = detectUpstreamErrorFromSseOrJsonText(sse); expect(res.isError).toBe(true); }); + test("SSE:data JSON error 为对象且 error.message 非空视为错误", () => { + const sse = ['data: {"error":{"message":"ERROR: no credentials"}}', ""].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(true); + }); + test("SSE:data JSON 小于 1000 字符且 message 包含 error 字样视为错误", () => { const sse = ['data: {"message":"ERROR: no credentials"}', ""].join("\n"); const res = detectUpstreamErrorFromSseOrJsonText(sse); expect(res.isError).toBe(true); }); + test("SSE:message 为对象时不应误判为错误", () => { + // 类 Anthropic SSE:message 字段通常是对象(不是错误字符串) + const sse = [ + 'data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant"}}', + "", + ].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(false); + }); + + test("SSE:不包含 error/message key 时不解析且不视为错误", () => { + const sse = ['data: {"foo":"bar"}', ""].join("\n"); + const res = detectUpstreamErrorFromSseOrJsonText(sse); + expect(res.isError).toBe(false); + }); + test("SSE:仅有 [DONE] 不视为错误", () => { const sse = ["data: [DONE]", ""].join("\n"); const res = detectUpstreamErrorFromSseOrJsonText(sse); expect(res.isError).toBe(false); }); }); - diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts index 8c6de4104..c59bdf513 100644 --- a/src/lib/utils/upstream-error-detection.ts +++ b/src/lib/utils/upstream-error-detection.ts @@ -1,5 +1,26 @@ import { parseSSEData } from "@/lib/utils/sse"; +/** + * 上游“假 200”错误检测(仅用于内部统计/熔断/故障转移判定)。 + * + * 背景 + * - 一些上游供应商在鉴权/配额/风控等错误场景下,会返回 HTTP 200, + * 但在 body 里给出错误 JSON(例如:`{"error":"当前无可用凭证"}`)。 + * - 在流式 SSE 场景中,这类错误可能被包裹在某个 `data: {...}` 事件里。 + * - CCH 在“已开始向客户端透传 SSE”后,无法再把 HTTP 状态码改成 4xx/5xx, + * 也无法阻止错误内容继续被传递到客户端。 + * + * 为什么还要检测 + * - 我们至少要让 CCH 自己意识到“这次请求实际上是失败的”,从而: + * 1) 触发故障转移/供应商熔断的失败统计; + * 2) 避免把 session 智能绑定(粘性)更新到一个实际不可用的 provider; + * 3) 让客户端下一次自动重试时,有机会切换到其他 provider(避免“假 200”导致重试仍复用同一坏 provider)。 + * + * 设计目标(偏保守) + * - 仅基于结构化字段做启发式判断:`error` 与 `message`; + * - 不扫描模型生成的正文内容(例如 content/choices),避免把用户/模型自然语言里的 "error" 误判为上游错误; + * - message 关键字检测仅对“小体积 JSON”启用,降低误判与性能开销。 + */ export type UpstreamErrorDetectionResult = | { isError: false } | { @@ -9,11 +30,16 @@ export type UpstreamErrorDetectionResult = type DetectionOptions = { /** - * 仅对小体积 JSON 启用 message 关键字检测,避免误判与无谓开销 + * 仅对小体积 JSON 启用 message 关键字检测,避免误判与无谓开销。 + * + * 说明:这里的“体积”是原始 JSON 文本(或 SSE 单个 data 的 JSON)序列化后的字符数, + * 而不是 HTTP 的 Content-Length。 */ maxJsonCharsForMessageCheck?: number; /** - * message 关键字匹配规则(默认 /error/i) + * message 关键字匹配规则(默认 /error/i)。 + * + * 注意:该规则只用于检查 `message` 或 `error.message` 字段(字符串)。 */ messageKeyword?: RegExp; }; @@ -26,6 +52,11 @@ function isPlainRecord(value: unknown): value is Record { } function hasNonEmptyValue(value: unknown): boolean { + // 这里的“非空”是为了判断“error 字段是否有内容”。 + // - string:trim 后非空 + // - number:非 0 且非 NaN(避免把默认 0 当作错误) + // - boolean:true 视为非空 + // - array/object:存在元素/键才算非空 if (value === null || value === undefined) return false; if (typeof value === "string") return value.trim().length > 0; if (typeof value === "number") return !Number.isNaN(value) && value !== 0; @@ -46,6 +77,9 @@ function detectFromJsonObject( rawJsonChars: number, options: Required> ): UpstreamErrorDetectionResult { + // 判定优先级: + // 1) `error` 非空:直接判定为错误(强信号) + // 2) 小体积 JSON 下,`message` / `error.message` 命中关键字:判定为错误(弱信号,但能覆盖部分“错误只写在 message”场景) const errorValue = obj.error; if (hasNonEmptyValue(errorValue)) { // 优先展示 string 或 error.message,避免把整个对象塞进 reason @@ -74,6 +108,7 @@ function detectFromJsonObject( ? obj.error.message : null; + // 注意:仅检查 message 字段本身,不扫描其它字段。 if (message && options.messageKeyword.test(message)) { return { isError: true, @@ -90,6 +125,11 @@ function detectFromJsonObject( * - 响应体为空:视为错误 * - JSON 里包含非空 error 字段:视为错误 * - 小于 1000 字符的 JSON:若 message(或 error.message)包含 "error" 字样:视为错误 + * + * 注意与限制: + * - 该函数不负责判断 HTTP 状态码;调用方通常只在“上游返回 200 且 SSE 正常结束后”使用它。 + * - 对 SSE 文本,仅解析 `data:` 事件中的 JSON(通过 parseSSEData)。 + * - 如果文本不是合法 JSON / SSE,函数会返回 `{isError:false}`(不做过度猜测)。 */ export function detectUpstreamErrorFromSseOrJsonText( text: string, @@ -120,11 +160,13 @@ export function detectUpstreamErrorFromSseOrJsonText( } // 情况 2:SSE 文本。快速过滤:既无 "error" 也无 "message" key 时跳过解析 - // 注意:这里用 key 形式的引号匹配,尽量避免 assistant 正文里出现 "error" 造成的无谓解析 + // 注意:这里用 key 形式的引号匹配,尽量避免模型正文里出现 error 造成的无谓解析。 + // 代价:如果上游返回的并非标准 JSON key(极少见),这里可能漏检;但我们偏向保守与低误判。 if (!text.includes("\"error\"") && !text.includes("\"message\"")) { return { isError: false }; } + // parseSSEData 会把每个事件的 data 尝试解析成对象;我们只对 object data 做结构化判定。 const events = parseSSEData(text); for (const evt of events) { if (!isPlainRecord(evt.data)) continue; @@ -141,4 +183,3 @@ export function detectUpstreamErrorFromSseOrJsonText( return { isError: false }; } - From 6a4cbdbcd2713d72a31de8e3e042c3cd3710ee8d Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 10:26:07 +0800 Subject: [PATCH 03/13] =?UTF-8?q?feat(session):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E7=A6=81=E7=94=A8=E4=BC=9A=E8=AF=9D=E5=93=8D=E5=BA=94=E4=BD=93?= =?UTF-8?q?=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 4 ++ src/lib/config/env.schema.ts | 8 ++++ src/lib/session-manager.ts | 10 ++++- .../env-store-session-response-body.test.ts | 45 +++++++++++++++++++ .../lib/session-manager-redaction.test.ts | 12 +++++ 5 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 tests/unit/lib/env-store-session-response-body.test.ts diff --git a/.env.example b/.env.example index 14193bd05..485ad52db 100644 --- a/.env.example +++ b/.env.example @@ -64,6 +64,10 @@ STORE_SESSION_MESSAGES=false # 会话消息存储模式(默认:fa # - false:存储请求/响应体但对 message 内容脱敏 [REDACTED] # - true:原样存储 message 内容(注意隐私和存储空间影响) # 警告:启用后会增加 Redis/DB 存储空间,且包含敏感信息 +STORE_SESSION_RESPONSE_BODY=true # 是否在 Redis 中存储会话响应体(默认:true) + # - true:存储(SSE/JSON),用于调试/定位问题(Redis 临时缓存) + # - false:不存储响应体(注意:不影响本次请求处理;仅影响后续查看 response body) + # 说明:该开关不影响内部统计读取响应体(tokens/费用统计、SSE 假 200 检测仍会进行) # 熔断器配置 # 功能说明:控制网络错误是否计入熔断器失败计数 diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts index 0055d3722..b120fd8c8 100644 --- a/src/lib/config/env.schema.ts +++ b/src/lib/config/env.schema.ts @@ -98,6 +98,14 @@ export const EnvSchema = z.object({ // - false (默认):存储请求/响应体但对 message 内容脱敏 [REDACTED] // - true:原样存储 message 内容(注意隐私和存储空间影响) STORE_SESSION_MESSAGES: z.string().default("false").transform(booleanTransform), + // 会话响应体存储开关 + // - true (默认):存储响应体(SSE/JSON),用于调试/回放/问题定位(Redis 临时缓存,默认 5 分钟) + // - false:不存储响应体(注意:不影响本次请求处理;仅影响后续在 UI/诊断中查看 response body) + // + // 说明: + // - 该开关只影响“写入 Redis 的响应体内容”,不影响内部统计逻辑读取响应体(例如 tokens/费用统计、SSE 结束后的假 200 检测)。 + // - message 内容是否脱敏仍由 STORE_SESSION_MESSAGES 控制。 + STORE_SESSION_RESPONSE_BODY: z.string().default("true").transform(booleanTransform), DEBUG_MODE: z.string().default("false").transform(booleanTransform), LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"), TZ: z.string().default("Asia/Shanghai"), diff --git a/src/lib/session-manager.ts b/src/lib/session-manager.ts index 276fd6dd1..5af426dd6 100644 --- a/src/lib/session-manager.ts +++ b/src/lib/session-manager.ts @@ -1331,7 +1331,11 @@ export class SessionManager { /** * 存储 session 响应体(临时存储,5分钟过期) * - * 存储策略受 STORE_SESSION_MESSAGES 控制: + * 存储行为受 STORE_SESSION_RESPONSE_BODY 控制: + * - true (默认):存储响应体到 Redis 临时缓存 + * - false:不存储(注意:不影响本次请求处理与统计,仅影响后续查看 response body) + * + * 存储策略(脱敏/原样)受 STORE_SESSION_MESSAGES 控制: * - true:原样存储响应内容 * - false(默认):对 JSON 响应体中的 message 内容脱敏 [REDACTED] * @@ -1344,6 +1348,10 @@ export class SessionManager { response: string | object, requestSequence?: number ): Promise { + // 允许通过环境变量显式关闭响应体存储(例如隐私/节省 Redis 内存)。 + // 注意:这里仅关闭“写入 Redis”这一步;调用方仍然可能在内存中读取响应体用于统计或错误检测。 + if (!getEnvConfig().STORE_SESSION_RESPONSE_BODY) return; + const redis = getRedisClient(); if (!redis || redis.status !== "ready") return; diff --git a/tests/unit/lib/env-store-session-response-body.test.ts b/tests/unit/lib/env-store-session-response-body.test.ts new file mode 100644 index 000000000..e4e95312a --- /dev/null +++ b/tests/unit/lib/env-store-session-response-body.test.ts @@ -0,0 +1,45 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { EnvSchema } from "@/lib/config/env.schema"; + +describe("EnvSchema - STORE_SESSION_RESPONSE_BODY", () => { + const originalEnv = process.env.STORE_SESSION_RESPONSE_BODY; + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.STORE_SESSION_RESPONSE_BODY; + } else { + process.env.STORE_SESSION_RESPONSE_BODY = originalEnv; + } + }); + + it("should default to true when not set", () => { + delete process.env.STORE_SESSION_RESPONSE_BODY; + const result = EnvSchema.parse(process.env); + expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true); + }); + + it("should parse 'true' as true", () => { + process.env.STORE_SESSION_RESPONSE_BODY = "true"; + const result = EnvSchema.parse(process.env); + expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true); + }); + + it("should parse 'false' as false", () => { + process.env.STORE_SESSION_RESPONSE_BODY = "false"; + const result = EnvSchema.parse(process.env); + expect(result.STORE_SESSION_RESPONSE_BODY).toBe(false); + }); + + it("should parse '0' as false", () => { + process.env.STORE_SESSION_RESPONSE_BODY = "0"; + const result = EnvSchema.parse(process.env); + expect(result.STORE_SESSION_RESPONSE_BODY).toBe(false); + }); + + it("should parse '1' as true", () => { + process.env.STORE_SESSION_RESPONSE_BODY = "1"; + const result = EnvSchema.parse(process.env); + expect(result.STORE_SESSION_RESPONSE_BODY).toBe(true); + }); +}); + diff --git a/tests/unit/lib/session-manager-redaction.test.ts b/tests/unit/lib/session-manager-redaction.test.ts index ae9b24cef..3cc3c24f3 100644 --- a/tests/unit/lib/session-manager-redaction.test.ts +++ b/tests/unit/lib/session-manager-redaction.test.ts @@ -48,9 +48,11 @@ vi.mock("@/lib/redis", () => ({ // Mock config - we'll control STORE_SESSION_MESSAGES dynamically let mockStoreMessages = false; +let mockStoreSessionResponseBody = true; vi.mock("@/lib/config/env.schema", () => ({ getEnvConfig: () => ({ STORE_SESSION_MESSAGES: mockStoreMessages, + STORE_SESSION_RESPONSE_BODY: mockStoreSessionResponseBody, SESSION_TTL: 300, }), })); @@ -62,10 +64,12 @@ describe("SessionManager - Redaction based on STORE_SESSION_MESSAGES", () => { beforeEach(() => { vi.clearAllMocks(); mockStoreMessages = false; // default: redact + mockStoreSessionResponseBody = true; // default: store response body }); afterEach(() => { mockStoreMessages = false; + mockStoreSessionResponseBody = true; }); describe("storeSessionMessages", () => { @@ -160,6 +164,14 @@ describe("SessionManager - Redaction based on STORE_SESSION_MESSAGES", () => { }); describe("storeSessionResponse", () => { + it("should skip storing response body when STORE_SESSION_RESPONSE_BODY=false", async () => { + mockStoreSessionResponseBody = false; + + await SessionManager.storeSessionResponse("sess_disabled", '{"message":"hello"}', 1); + + expect(redisMock.setex).not.toHaveBeenCalled(); + }); + it("should store redacted JSON response when STORE_SESSION_MESSAGES=false", async () => { mockStoreMessages = false; const responseBody = { From a58444405f931a4b977e95dc8f74491b93d77fd4 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 10:47:47 +0800 Subject: [PATCH 04/13] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E8=AF=A6=E6=83=85=E5=AD=98=E5=82=A8=E6=8F=90=E7=A4=BA?= =?UTF-8?q?=E5=B9=B6=E5=90=8C=E6=AD=A5=E9=83=A8=E7=BD=B2=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- messages/en/dashboard.json | 4 ++-- messages/ja/dashboard.json | 4 ++-- messages/ru/dashboard.json | 4 ++-- messages/zh-CN/dashboard.json | 4 ++-- messages/zh-TW/dashboard.json | 4 ++-- scripts/deploy.ps1 | 1 + scripts/deploy.sh | 1 + 7 files changed, 12 insertions(+), 10 deletions(-) diff --git a/messages/en/dashboard.json b/messages/en/dashboard.json index 3ef7670e2..be09c08b2 100644 --- a/messages/en/dashboard.json +++ b/messages/en/dashboard.json @@ -493,7 +493,7 @@ "providers": "Providers", "models": "Models", "noDetailedData": "No detailed data available", - "storageTip": "No detailed data found. To view request details, please check if the environment variable STORE_SESSION_MESSAGES is set to true. Note: Enabling this increases Redis memory usage and may include sensitive information.", + "storageTip": "No detailed data found. Possible reasons: Redis is disabled/unavailable (REDIS_URL + ENABLE_RATE_LIMIT=true), the data expired (SESSION_TTL, default 300s), or response body storage is disabled (STORE_SESSION_RESPONSE_BODY=false, affects response body only). To store unredacted messages, set STORE_SESSION_MESSAGES=true.", "clientInfo": "Client Info", "requestHeaders": "Request Headers", "requestBody": "Request Body", @@ -571,7 +571,7 @@ "fetchFailed": "Fetch Failed", "unknownError": "Unknown Error", "storageNotEnabled": "Not Stored", - "storageNotEnabledHint": "Tip: Set environment variable STORE_SESSION_MESSAGES=true to enable messages storage" + "storageNotEnabledHint": "Tip: Check REDIS_URL and ENABLE_RATE_LIMIT=true (session details cache). To store unredacted messages, set STORE_SESSION_MESSAGES=true." }, "errors": { "copyFailed": "Copy Failed" diff --git a/messages/ja/dashboard.json b/messages/ja/dashboard.json index 0ac8af2fd..d330e3e7e 100644 --- a/messages/ja/dashboard.json +++ b/messages/ja/dashboard.json @@ -493,7 +493,7 @@ "providers": "プロバイダー", "models": "モデル", "noDetailedData": "詳細データなし", - "storageTip": "詳細データが見つかりません。リクエストの詳細を表示するには、環境変数 STORE_SESSION_MESSAGES が true に設定されているか確認してください。注意:有効にすると Redis のメモリ使用量が増加し、機密情報が含まれる可能性があります。", + "storageTip": "詳細データが見つかりません。原因の例:Redis が未設定/利用不可 (REDIS_URL + ENABLE_RATE_LIMIT=true)、データの期限切れ (SESSION_TTL、既定 300 秒)、または応答本文の保存を無効化 (STORE_SESSION_RESPONSE_BODY=false、応答本文のみ)。未マスクの messages を保存するには STORE_SESSION_MESSAGES=true を設定してください。", "clientInfo": "クライアント情報", "requestHeaders": "リクエストヘッダー", "requestBody": "リクエストボディ", @@ -571,7 +571,7 @@ "fetchFailed": "取得失敗", "unknownError": "不明なエラー", "storageNotEnabled": "未保存", - "storageNotEnabledHint": "ヒント: メッセージの保存を有効にするには、環境変数 STORE_SESSION_MESSAGES=true を設定してください" + "storageNotEnabledHint": "ヒント: REDIS_URL と ENABLE_RATE_LIMIT=true を確認してください (セッション詳細キャッシュ)。未マスクの messages を保存するには STORE_SESSION_MESSAGES=true を設定してください。" }, "errors": { "copyFailed": "コピー失敗" diff --git a/messages/ru/dashboard.json b/messages/ru/dashboard.json index a23506cfe..54923b324 100644 --- a/messages/ru/dashboard.json +++ b/messages/ru/dashboard.json @@ -493,7 +493,7 @@ "providers": "Поставщики", "models": "Модели", "noDetailedData": "Подробные данные отсутствуют", - "storageTip": "Подробные данные не найдены. Чтобы просмотреть детали запроса, проверьте, установлена ли переменная окружения STORE_SESSION_MESSAGES в значение true. Примечание: включение увеличит использование памяти Redis и может содержать конфиденциальную информацию.", + "storageTip": "Подробные данные не найдены. Возможные причины: Redis отключен/недоступен (REDIS_URL + ENABLE_RATE_LIMIT=true), данные истекли (SESSION_TTL, по умолчанию 300с), или сохранение тела ответа отключено (STORE_SESSION_RESPONSE_BODY=false, влияет только на тело ответа). Чтобы сохранять сообщения без маскировки, установите STORE_SESSION_MESSAGES=true.", "clientInfo": "Информация о клиенте", "requestHeaders": "Заголовки запроса", "requestBody": "Тело запроса", @@ -571,7 +571,7 @@ "fetchFailed": "Не удалось получить", "unknownError": "Неизвестная ошибка", "storageNotEnabled": "Не сохранено", - "storageNotEnabledHint": "Подсказка: установите переменную окружения STORE_SESSION_MESSAGES=true, чтобы включить сохранение сообщений" + "storageNotEnabledHint": "Подсказка: проверьте REDIS_URL и ENABLE_RATE_LIMIT=true (кэш деталей сессии). Чтобы сохранять сообщения без маскировки, установите STORE_SESSION_MESSAGES=true." }, "errors": { "copyFailed": "Не удалось скопировать" diff --git a/messages/zh-CN/dashboard.json b/messages/zh-CN/dashboard.json index 79381ffac..e83f55958 100644 --- a/messages/zh-CN/dashboard.json +++ b/messages/zh-CN/dashboard.json @@ -493,7 +493,7 @@ "providers": "供应商", "models": "模型", "noDetailedData": "暂无详细数据", - "storageTip": "未找到详细数据。如需查看请求详情,请检查环境变量 STORE_SESSION_MESSAGES 是否已设置为 true。注意:启用后会增加 Redis 内存使用,且可能包含敏感信息。", + "storageTip": "未找到详细数据。可能原因:Redis 未配置/不可用(REDIS_URL + ENABLE_RATE_LIMIT=true)、数据已过期(SESSION_TTL,默认 300 秒),或已禁用响应体存储(STORE_SESSION_RESPONSE_BODY=false,仅影响响应体)。如需保存未脱敏 messages,请设置 STORE_SESSION_MESSAGES=true。", "clientInfo": "客户端信息", "requestHeaders": "请求头", "requestBody": "请求体", @@ -571,7 +571,7 @@ "fetchFailed": "获取失败", "unknownError": "未知错误", "storageNotEnabled": "未存储", - "storageNotEnabledHint": "提示:请设置环境变量 STORE_SESSION_MESSAGES=true 以启用 messages 存储" + "storageNotEnabledHint": "提示:请检查 REDIS_URL 与 ENABLE_RATE_LIMIT=true(用于会话详情缓存);如需保存未脱敏 messages,请设置 STORE_SESSION_MESSAGES=true。" }, "errors": { "copyFailed": "复制失败" diff --git a/messages/zh-TW/dashboard.json b/messages/zh-TW/dashboard.json index fcf8179ae..37ed95570 100644 --- a/messages/zh-TW/dashboard.json +++ b/messages/zh-TW/dashboard.json @@ -493,7 +493,7 @@ "providers": "供應商", "models": "Model", "noDetailedData": "暫無詳細資料", - "storageTip": "未找到詳細資料。如需查看請求詳情,請檢查環境變數 STORE_SESSION_MESSAGES 是否已設定為 true。注意:啟用後會增加 Redis 記憶體使用,且可能包含敏感資訊。", + "storageTip": "未找到詳細資料。可能原因:Redis 未設定/不可用(REDIS_URL + ENABLE_RATE_LIMIT=true)、資料已過期(SESSION_TTL,預設 300 秒),或已停用回應本文儲存(STORE_SESSION_RESPONSE_BODY=false,僅影響回應本文)。如需儲存未脫敏的 messages,請設定 STORE_SESSION_MESSAGES=true。", "clientInfo": "用戶端資訊", "requestHeaders": "請求頭", "requestBody": "請求體", @@ -571,7 +571,7 @@ "fetchFailed": "取得失敗", "unknownError": "未知錯誤", "storageNotEnabled": "未儲存", - "storageNotEnabledHint": "提示:請設定環境變數 STORE_SESSION_MESSAGES=true 以啟用訊息儲存" + "storageNotEnabledHint": "提示:請檢查 REDIS_URL 與 ENABLE_RATE_LIMIT=true(用於 Session 詳情快取);如需儲存未脫敏的 messages,請設定 STORE_SESSION_MESSAGES=true。" }, "errors": { "copyFailed": "複製失敗" diff --git a/scripts/deploy.ps1 b/scripts/deploy.ps1 index a0a50d286..d7f1e41de 100644 --- a/scripts/deploy.ps1 +++ b/scripts/deploy.ps1 @@ -496,6 +496,7 @@ ENABLE_RATE_LIMIT=true # Session Configuration SESSION_TTL=300 STORE_SESSION_MESSAGES=false +STORE_SESSION_RESPONSE_BODY=true # Cookie Security ENABLE_SECURE_COOKIES=$secureCookies diff --git a/scripts/deploy.sh b/scripts/deploy.sh index ea39c350f..51e457a8e 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -578,6 +578,7 @@ ENABLE_RATE_LIMIT=true # Session Configuration SESSION_TTL=300 STORE_SESSION_MESSAGES=false +STORE_SESSION_RESPONSE_BODY=true # Cookie Security ENABLE_SECURE_COOKIES=${secure_cookies} From 6aff1745acbfc711e01876820d2c9de62e52b77c Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 11:04:11 +0800 Subject: [PATCH 05/13] =?UTF-8?q?fix(proxy):=20=E8=A1=A5=E9=BD=90=E5=81=87?= =?UTF-8?q?200=E7=9A=84=20endpoint=20=E5=A4=B1=E8=B4=A5=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E4=B8=8E=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 19 +++++++++++++++++-- .../utils/upstream-error-detection.test.ts | 10 ++++++++++ src/lib/utils/upstream-error-detection.ts | 7 +------ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 3d5d53d5c..13e99bb5b 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -139,10 +139,25 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( } catch (cbError) { logger.warn("[ResponseHandler] Failed to record fake-200 error in circuit breaker", { providerId: meta.providerId, + sessionId: session.sessionId ?? null, error: cbError, }); } + // endpoint 级熔断:与成功路径保持对称,避免“假 200”只影响 provider 而不影响 endpoint 健康度 + if (meta.endpointId != null) { + try { + const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker"); + await recordEndpointFailure(meta.endpointId, new Error(detected.reason)); + } catch (endpointError) { + logger.warn("[ResponseHandler] Failed to record endpoint failure (fake 200)", { + endpointId: meta.endpointId, + providerId: meta.providerId, + error: endpointError, + }); + } + } + // 记录到决策链(用于日志展示与 DB 持久化)。 // 注意:这里用 effectiveStatusCode(502)而不是 upstreamStatusCode(200), // 以便让内部链路明确显示这是一次失败(否则会被误读为成功)。 @@ -182,7 +197,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( }); } - // ⭐ 成功后绑定 session 到供应商(智能绑定策略) + // 成功后绑定 session 到供应商(智能绑定策略) if (session.sessionId) { const result = await SessionManager.updateSessionBindingSmart( session.sessionId, @@ -214,7 +229,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( }); } - // ⭐ 统一更新两个数据源(确保监控数据一致) + // 统一更新两个数据源(确保监控数据一致) void SessionManager.updateSessionProvider(session.sessionId, { providerId: meta.providerId, providerName: meta.providerName, diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts index 985583d72..272d9bf96 100644 --- a/src/lib/utils/upstream-error-detection.test.ts +++ b/src/lib/utils/upstream-error-detection.test.ts @@ -21,6 +21,16 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { expect(res.isError).toBe(true); }); + test.each(['{"error":true}', '{"error":42}'])("纯 JSON:error 为非字符串类型也应视为错误(%s)", (body) => { + const res = detectUpstreamErrorFromSseOrJsonText(body); + expect(res.isError).toBe(true); + }); + + test("纯 JSON:JSON 数组不视为错误(避免误判)", () => { + const res = detectUpstreamErrorFromSseOrJsonText('[{"error":"something"}]'); + expect(res.isError).toBe(false); + }); + test("纯 JSON:error 为空字符串不视为错误", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"error":""}'); expect(res.isError).toBe(false); diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts index c59bdf513..15a511be7 100644 --- a/src/lib/utils/upstream-error-detection.ts +++ b/src/lib/utils/upstream-error-detection.ts @@ -101,12 +101,7 @@ function detectFromJsonObject( } if (rawJsonChars < options.maxJsonCharsForMessageCheck) { - const message = - typeof obj.message === "string" - ? obj.message - : isPlainRecord(obj.error) && typeof obj.error.message === "string" - ? obj.error.message - : null; + const message = typeof obj.message === "string" ? obj.message : null; // 注意:仅检查 message 字段本身,不扫描其它字段。 if (message && options.messageKeyword.test(message)) { From 89f05765540bca17ecafafa562abc343ba256375 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 11:23:51 +0800 Subject: [PATCH 06/13] =?UTF-8?q?fix(proxy):=20=E5=8A=A0=E5=9B=BA=E5=81=87?= =?UTF-8?q?200=E6=A3=80=E6=B5=8B=E4=B8=80=E8=87=B4=E6=80=A7=E4=B8=8E?= =?UTF-8?q?=E8=84=B1=E6=95=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 33 ++++++++- src/app/v1/_lib/proxy/stream-finalization.ts | 16 ++-- .../utils/upstream-error-detection.test.ts | 20 ++++- src/lib/utils/upstream-error-detection.ts | 74 ++++++++++++++++--- 4 files changed, 122 insertions(+), 21 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 13e99bb5b..f489d55d8 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -114,12 +114,41 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( return { effectiveStatusCode, errorMessage }; } + // meta 由 Forwarder 在“拿到 upstream Response 的那一刻”记录,代表真正产生本次流的 provider。 + // 即使 session.provider 在之后被其它逻辑意外修改(极端情况),我们仍以 meta 为准更新: + // - provider/endpoint 熔断与统计 + // - session 智能绑定 + // 这样能避免把成功/失败记到错误的 provider 上。 + let providerForChain = provider; if (provider.id !== meta.providerId) { logger.warn("[ResponseHandler] Deferred streaming meta provider mismatch", { sessionId: session.sessionId ?? null, metaProviderId: meta.providerId, currentProviderId: provider.id, + canonicalProviderId: meta.providerId, }); + + // 尝试用 meta.providerId 找回正确的 Provider 对象,保证 providerChain 的审计数据一致 + try { + const providers = await session.getProvidersSnapshot(); + const resolved = providers.find((p) => p.id === meta.providerId); + if (resolved) { + providerForChain = resolved; + } else { + logger.warn("[ResponseHandler] Deferred streaming meta provider not found in snapshot", { + sessionId: session.sessionId ?? null, + metaProviderId: meta.providerId, + currentProviderId: provider.id, + }); + } + } catch (resolveError) { + logger.warn("[ResponseHandler] Failed to resolve meta provider from snapshot", { + sessionId: session.sessionId ?? null, + metaProviderId: meta.providerId, + currentProviderId: provider.id, + error: resolveError, + }); + } } if (detected.isError) { @@ -161,7 +190,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( // 记录到决策链(用于日志展示与 DB 持久化)。 // 注意:这里用 effectiveStatusCode(502)而不是 upstreamStatusCode(200), // 以便让内部链路明确显示这是一次失败(否则会被误读为成功)。 - session.addProviderToChain(provider, { + session.addProviderToChain(providerForChain, { endpointId: meta.endpointId, endpointUrl: meta.endpointUrl, reason: "retry_failed", @@ -240,7 +269,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( }); } - session.addProviderToChain(provider, { + session.addProviderToChain(providerForChain, { endpointId: meta.endpointId, endpointUrl: meta.endpointUrl, reason: meta.isFirstAttempt ? "request_success" : "retry_success", diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts index 4a4766700..b32142734 100644 --- a/src/app/v1/_lib/proxy/stream-finalization.ts +++ b/src/app/v1/_lib/proxy/stream-finalization.ts @@ -10,7 +10,10 @@ import type { ProxySession } from "./session"; * - ResponseHandler:在流正常结束后,基于最终响应体做一次补充检查,然后再更新熔断/endpoint/会话绑定。 * * 说明: - * - 这里选择“把元信息挂到 session 上”而不是改动大量类型/函数签名,避免改动面过大; + * - 这里选择使用 WeakMap,而不是把字段挂到 session 上: + * - 避免污染 ProxySession 对象; + * - 更类型安全; + * - 元信息生命周期跟随 session 实例,消费后可立即清理。 * - 元信息是一次性的:消费后会被清空,避免跨请求污染。 */ export type DeferredStreamingFinalization = { @@ -26,26 +29,23 @@ export type DeferredStreamingFinalization = { upstreamStatusCode: number; }; -type SessionWithDeferred = ProxySession & { - deferredStreamingFinalization?: DeferredStreamingFinalization; -}; +const deferredMeta = new WeakMap(); export function setDeferredStreamingFinalization( session: ProxySession, meta: DeferredStreamingFinalization ): void { // Forwarder 在识别到 SSE 时调用:标记该请求需要在流结束后“二次结算”。 - (session as SessionWithDeferred).deferredStreamingFinalization = meta; + deferredMeta.set(session, meta); } export function consumeDeferredStreamingFinalization( session: ProxySession ): DeferredStreamingFinalization | null { - const s = session as SessionWithDeferred; - const meta = s.deferredStreamingFinalization ?? null; + const meta = deferredMeta.get(session) ?? null; if (meta) { // 只允许消费一次:避免重复结算(例如多个后台统计任务并行时)。 - s.deferredStreamingFinalization = undefined; + deferredMeta.delete(session); } return meta; } diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts index 272d9bf96..742ea29f6 100644 --- a/src/lib/utils/upstream-error-detection.test.ts +++ b/src/lib/utils/upstream-error-detection.test.ts @@ -26,11 +26,29 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { expect(res.isError).toBe(true); }); - test("纯 JSON:JSON 数组不视为错误(避免误判)", () => { + test("JSON 数组输入不视为错误(目前不做解析)", () => { const res = detectUpstreamErrorFromSseOrJsonText('[{"error":"something"}]'); expect(res.isError).toBe(false); }); + test("reason 应对 Bearer token 做脱敏(避免写入日志/Redis/DB)", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"error":"Bearer abc.def_ghi"}'); + expect(res.isError).toBe(true); + if (res.isError) { + expect(res.reason).toContain("Bearer [REDACTED]"); + expect(res.reason).not.toContain("abc.def_ghi"); + } + }); + + test("reason 应对常见 API key 前缀做脱敏(避免写入日志/Redis/DB)", () => { + const res = detectUpstreamErrorFromSseOrJsonText('{"error":"sk-1234567890abcdef123456"}'); + expect(res.isError).toBe(true); + if (res.isError) { + expect(res.reason).toContain("[REDACTED_KEY]"); + expect(res.reason).not.toContain("sk-1234567890abcdef123456"); + } + }); + test("纯 JSON:error 为空字符串不视为错误", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"error":""}'); expect(res.isError).toBe(false); diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts index 15a511be7..88f1ffe0f 100644 --- a/src/lib/utils/upstream-error-detection.ts +++ b/src/lib/utils/upstream-error-detection.ts @@ -20,6 +20,7 @@ import { parseSSEData } from "@/lib/utils/sse"; * - 仅基于结构化字段做启发式判断:`error` 与 `message`; * - 不扫描模型生成的正文内容(例如 content/choices),避免把用户/模型自然语言里的 "error" 误判为上游错误; * - message 关键字检测仅对“小体积 JSON”启用,降低误判与性能开销。 + * - reason 字段会做脱敏与截断:避免把上游错误中可能包含的敏感信息写入日志/Redis/DB。 */ export type UpstreamErrorDetectionResult = | { isError: false } @@ -39,7 +40,8 @@ type DetectionOptions = { /** * message 关键字匹配规则(默认 /error/i)。 * - * 注意:该规则只用于检查 `message` 或 `error.message` 字段(字符串)。 + * 注意:该规则只用于检查 `message` 字段(字符串)。 + * `error.message` 属于更强信号:只要 `error` 非空(含对象形式),就会直接判定为错误。 */ messageKeyword?: RegExp; }; @@ -66,8 +68,44 @@ function hasNonEmptyValue(value: unknown): boolean { return true; } +function sanitizeErrorTextForReason(text: string): string { + // 注意:这里的目的不是“完美脱敏”,而是尽量降低上游错误信息中意外夹带敏感内容的风险。 + // 若后续发现更多敏感模式,可在不改变检测语义的前提下补充。 + let sanitized = text; + + // Bearer token + sanitized = sanitized.replace(/Bearer\s+[A-Za-z0-9._-]+/gi, "Bearer [REDACTED]"); + + // Common API key prefixes (OpenAI/Claude/Codex 等) + sanitized = sanitized.replace(/\b(?:sk|rk|pk)-[A-Za-z0-9_-]{16,}\b/giu, "[REDACTED_KEY]"); + sanitized = sanitized.replace(/\bAIza[0-9A-Za-z_-]{16,}\b/g, "[REDACTED_KEY]"); + + // JWT(base64url 三段) + sanitized = sanitized.replace( + /\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b/g, + "[JWT]" + ); + + // Email + sanitized = sanitized.replace( + /\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b/g, + "[EMAIL]" + ); + + // 通用敏感键值(尽量覆盖常见写法) + sanitized = sanitized.replace( + /\b(password|token|secret|api[_-]?key)\b\s*[:=]\s*['"]?[^'"\s]+['"]?/gi, + "$1:***" + ); + + // 常见配置/凭证路径(避免把文件名/路径泄露到审计字段里) + sanitized = sanitized.replace(/\/[\w.-]+\.(?:env|ya?ml|json|conf|ini)/gi, "[PATH]"); + + return sanitized; +} + function truncateForReason(text: string, maxLen: number = 200): string { - const trimmed = text.trim(); + const trimmed = sanitizeErrorTextForReason(text).trim(); if (trimmed.length <= maxLen) return trimmed; return `${trimmed.slice(0, maxLen)}…`; } @@ -79,7 +117,7 @@ function detectFromJsonObject( ): UpstreamErrorDetectionResult { // 判定优先级: // 1) `error` 非空:直接判定为错误(强信号) - // 2) 小体积 JSON 下,`message` / `error.message` 命中关键字:判定为错误(弱信号,但能覆盖部分“错误只写在 message”场景) + // 2) 小体积 JSON 下,`message` 命中关键字:判定为错误(弱信号,但能覆盖部分“错误只写在 message”场景) const errorValue = obj.error; if (hasNonEmptyValue(errorValue)) { // 优先展示 string 或 error.message,避免把整个对象塞进 reason @@ -119,7 +157,7 @@ function detectFromJsonObject( * 用于“流式 SSE 已经结束后”的补充检查: * - 响应体为空:视为错误 * - JSON 里包含非空 error 字段:视为错误 - * - 小于 1000 字符的 JSON:若 message(或 error.message)包含 "error" 字样:视为错误 + * - 小于 1000 字符的 JSON:若 message 包含 "error" 字样:视为错误 * * 注意与限制: * - 该函数不负责判断 HTTP 状态码;调用方通常只在“上游返回 200 且 SSE 正常结束后”使用它。 @@ -141,8 +179,10 @@ export function detectUpstreamErrorFromSseOrJsonText( return { isError: true, reason: "上游返回 200 但响应体为空" }; } - // 情况 1:纯 JSON(上游可能 Content-Type 设置为 SSE,但实际上返回 JSON) - if (trimmed.startsWith("{") || trimmed.startsWith("[")) { + // 情况 1:纯 JSON(对象) + // 上游可能 Content-Type 设置为 SSE,但实际上返回 JSON;此处只处理对象格式({...}), + // 不处理数组([...])以避免误判(数组场景的语义差异较大,后续若确认需要再扩展)。 + if (trimmed.startsWith("{")) { try { const parsed = JSON.parse(trimmed) as unknown; if (isPlainRecord(parsed)) { @@ -157,6 +197,9 @@ export function detectUpstreamErrorFromSseOrJsonText( // 情况 2:SSE 文本。快速过滤:既无 "error" 也无 "message" key 时跳过解析 // 注意:这里用 key 形式的引号匹配,尽量避免模型正文里出现 error 造成的无谓解析。 // 代价:如果上游返回的并非标准 JSON key(极少见),这里可能漏检;但我们偏向保守与低误判。 + // + // 额外说明:这里刻意只匹配 `"error"` / `"message"`(含双引号), + // 若正文里出现被转义的 `\"error\"`(字符串内容),不会命中,这是为了避免误判。 if (!text.includes("\"error\"") && !text.includes("\"message\"")) { return { isError: false }; } @@ -165,11 +208,22 @@ export function detectUpstreamErrorFromSseOrJsonText( const events = parseSSEData(text); for (const evt of events) { if (!isPlainRecord(evt.data)) continue; + // 性能优化:只有在 message 是字符串、且“看起来足够小”时才需要精确计算 JSON 字符数。 + // 对大多数 SSE 事件(message 为对象、或没有 message),无需 JSON.stringify。 let chars = 0; - try { - chars = JSON.stringify(evt.data).length; - } catch { - // ignore + const errorValue = evt.data.error; + const messageValue = evt.data.message; + if (!hasNonEmptyValue(errorValue) && typeof messageValue === "string") { + if (messageValue.length >= merged.maxJsonCharsForMessageCheck) { + chars = merged.maxJsonCharsForMessageCheck; // >= 阈值即可跳过 message 关键字判定 + } else { + try { + chars = JSON.stringify(evt.data).length; + } catch { + // stringify 失败时回退为近似值(仍保持“仅小体积 JSON 才做 message 检测”的意图) + chars = messageValue.length; + } + } } const res = detectFromJsonObject(evt.data, chars, merged); From 1adaf178415cc75bebf51dd144fe518fb0a5a660 Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 11:50:40 +0800 Subject: [PATCH 07/13] =?UTF-8?q?fix(proxy):=20=E6=B5=81=E5=BC=8F=E4=B8=AD?= =?UTF-8?q?=E6=96=AD=E6=8C=89=E5=A4=B1=E8=B4=A5=E7=BB=93=E7=AE=97=E5=B9=B6?= =?UTF-8?q?=E6=94=B9=E7=94=A8=E9=94=99=E8=AF=AF=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app/v1/_lib/proxy/response-handler.ts | 67 ++++++++++++++----- src/app/v1/_lib/proxy/stream-finalization.ts | 2 + .../utils/upstream-error-detection.test.ts | 16 +++-- src/lib/utils/upstream-error-detection.ts | 34 +++++++--- 4 files changed, 84 insertions(+), 35 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index f489d55d8..1fbbade4a 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -88,12 +88,14 @@ type FinalizeDeferredStreamingResult = { * - 如果流正常结束且未命中错误判定,则按成功结算并更新绑定/熔断/endpoint 成功率。 * * @param streamEndedNormally - 必须是 reader 读到 done=true 的“自然结束”;超时/中断等异常结束由其它逻辑处理。 + * @param clientAborted - 标记是否为客户端主动中断(用于内部状态码映射,避免把中断记为 200 completed) */ async function finalizeDeferredStreamingFinalizationIfNeeded( session: ProxySession, allContent: string, upstreamStatusCode: number, - streamEndedNormally: boolean + streamEndedNormally: boolean, + clientAborted: boolean ): Promise { const meta = consumeDeferredStreamingFinalization(session); const provider = session.provider; @@ -102,15 +104,26 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( ? detectUpstreamErrorFromSseOrJsonText(allContent) : ({ isError: false } as const); - // “假 200”统一映射为 502:既能让内部状态落到 error 分支, - // 也能让后续熔断/故障转移策略按“上游错误”处理。 - const effectiveStatusCode = detected.isError ? 502 : upstreamStatusCode; - const errorMessage = detected.isError ? detected.reason : null; + // “内部结算用”的状态码(不会改变客户端实际 HTTP 状态码)。 + // - 假 200:映射为 502,确保内部统计/熔断/会话绑定把它当作失败。 + // - 未自然结束:也应映射为失败(避免把中断/部分流误记为 200 completed)。 + let effectiveStatusCode: number; + let errorMessage: string | null; + if (detected.isError) { + effectiveStatusCode = 502; + errorMessage = detected.code; + } else if (!streamEndedNormally) { + effectiveStatusCode = clientAborted ? 499 : 502; + errorMessage = clientAborted ? "CLIENT_ABORTED" : "STREAM_ABORTED"; + } else { + effectiveStatusCode = upstreamStatusCode; + errorMessage = null; + } - // 未启用延迟结算 / provider 缺失 / 未自然结束: + // 未启用延迟结算 / provider 缺失: // - 只返回“内部状态码 + 错误原因”,由调用方写入统计; - // - 不在这里更新熔断/绑定(异常结束场景通常已有专门逻辑做 recordFailure/persistRequestFailure)。 - if (!meta || !provider || !streamEndedNormally) { + // - 不在这里更新熔断/绑定(meta 缺失意味着 Forwarder 没有启用延迟结算;provider 缺失意味着无法归因)。 + if (!meta || !provider) { return { effectiveStatusCode, errorMessage }; } @@ -151,20 +164,35 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( } } + // 未自然结束:不更新熔断/绑定(中断/超时等通常由其它逻辑处理),但要避免把它误记为 200 completed。 + if (!streamEndedNormally) { + session.addProviderToChain(providerForChain, { + endpointId: meta.endpointId, + endpointUrl: meta.endpointUrl, + reason: "system_error", + attemptNumber: meta.attemptNumber, + statusCode: effectiveStatusCode, + errorMessage: errorMessage ?? undefined, + }); + + return { effectiveStatusCode, errorMessage }; + } + if (detected.isError) { logger.warn("[ResponseHandler] SSE completed but body indicates error (fake 200)", { providerId: meta.providerId, providerName: meta.providerName, upstreamStatusCode: meta.upstreamStatusCode, effectiveStatusCode, - reason: detected.reason, + code: detected.code, + detail: detected.detail ?? null, }); // 计入熔断器:让后续请求能正确触发故障转移/熔断 try { // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。 const { recordFailure } = await import("@/lib/circuit-breaker"); - await recordFailure(meta.providerId, new Error(detected.reason)); + await recordFailure(meta.providerId, new Error(detected.code)); } catch (cbError) { logger.warn("[ResponseHandler] Failed to record fake-200 error in circuit breaker", { providerId: meta.providerId, @@ -177,7 +205,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( if (meta.endpointId != null) { try { const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker"); - await recordEndpointFailure(meta.endpointId, new Error(detected.reason)); + await recordEndpointFailure(meta.endpointId, new Error(detected.code)); } catch (endpointError) { logger.warn("[ResponseHandler] Failed to record endpoint failure (fake 200)", { endpointId: meta.endpointId, @@ -196,7 +224,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( reason: "retry_failed", attemptNumber: meta.attemptNumber, statusCode: effectiveStatusCode, - errorMessage: detected.reason, + errorMessage: detected.code, }); return { effectiveStatusCode, errorMessage }; @@ -827,6 +855,7 @@ export class ProxyResponseHandler { const flushed = decoder.decode(); if (flushed) chunks.push(flushed); const allContent = chunks.join(""); + const clientAborted = session.clientAbortSignal?.aborted ?? false; // 存储响应体到 Redis(5分钟过期) if (session.sessionId) { @@ -845,7 +874,8 @@ export class ProxyResponseHandler { session, allContent, statusCode, - streamEndedNormally + streamEndedNormally, + clientAborted ); await finalizeRequestStats( session, @@ -1026,13 +1056,15 @@ export class ProxyResponseHandler { const finalizeStream = async ( allContent: string, - streamEndedNormally: boolean + streamEndedNormally: boolean, + clientAborted: boolean ): Promise => { const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( session, allContent, statusCode, - streamEndedNormally + streamEndedNormally, + clientAborted ); const effectiveStatusCode = finalized.effectiveStatusCode; const streamErrorMessage = finalized.errorMessage; @@ -1211,7 +1243,8 @@ export class ProxyResponseHandler { // ⭐ 流式读取完成:清除静默期计时器 clearIdleTimer(); const allContent = flushAndJoin(); - await finalizeStream(allContent, streamEndedNormally); + const clientAborted = session.clientAbortSignal?.aborted ?? false; + await finalizeStream(allContent, streamEndedNormally, clientAborted); } catch (error) { // 检测 AbortError 的来源:响应超时 vs 静默期超时 vs 客户端/上游中断 const err = error as Error; @@ -1336,7 +1369,7 @@ export class ProxyResponseHandler { }); try { const allContent = flushAndJoin(); - await finalizeStream(allContent, false); + await finalizeStream(allContent, false, true); } catch (finalizeError) { logger.error("ResponseHandler: Failed to finalize aborted stream response", { taskId, diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts index b32142734..e107f6f1e 100644 --- a/src/app/v1/_lib/proxy/stream-finalization.ts +++ b/src/app/v1/_lib/proxy/stream-finalization.ts @@ -42,6 +42,8 @@ export function setDeferredStreamingFinalization( export function consumeDeferredStreamingFinalization( session: ProxySession ): DeferredStreamingFinalization | null { + // 备注:该函数内部无 await,JS 事件循环保证它是“原子”的(不会被并发打断)。 + // 即使多个后台任务先后调用,也只有第一次能拿到 meta,其余调用都会得到 null。 const meta = deferredMeta.get(session) ?? null; if (meta) { // 只允许消费一次:避免重复结算(例如多个后台统计任务并行时)。 diff --git a/src/lib/utils/upstream-error-detection.test.ts b/src/lib/utils/upstream-error-detection.test.ts index 742ea29f6..e14a4fa77 100644 --- a/src/lib/utils/upstream-error-detection.test.ts +++ b/src/lib/utils/upstream-error-detection.test.ts @@ -5,7 +5,7 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { test("空响应体视为错误", () => { expect(detectUpstreamErrorFromSseOrJsonText("")).toEqual({ isError: true, - reason: "上游返回 200 但响应体为空", + code: "FAKE_200_EMPTY_BODY", }); }); @@ -31,21 +31,23 @@ describe("detectUpstreamErrorFromSseOrJsonText", () => { expect(res.isError).toBe(false); }); - test("reason 应对 Bearer token 做脱敏(避免写入日志/Redis/DB)", () => { + test("detail 应对 Bearer token 做脱敏(避免泄露到日志/Redis/DB)", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"error":"Bearer abc.def_ghi"}'); expect(res.isError).toBe(true); if (res.isError) { - expect(res.reason).toContain("Bearer [REDACTED]"); - expect(res.reason).not.toContain("abc.def_ghi"); + const detail = res.detail ?? ""; + expect(detail).toContain("Bearer [REDACTED]"); + expect(detail).not.toContain("abc.def_ghi"); } }); - test("reason 应对常见 API key 前缀做脱敏(避免写入日志/Redis/DB)", () => { + test("detail 应对常见 API key 前缀做脱敏(避免泄露到日志/Redis/DB)", () => { const res = detectUpstreamErrorFromSseOrJsonText('{"error":"sk-1234567890abcdef123456"}'); expect(res.isError).toBe(true); if (res.isError) { - expect(res.reason).toContain("[REDACTED_KEY]"); - expect(res.reason).not.toContain("sk-1234567890abcdef123456"); + const detail = res.detail ?? ""; + expect(detail).toContain("[REDACTED_KEY]"); + expect(detail).not.toContain("sk-1234567890abcdef123456"); } }); diff --git a/src/lib/utils/upstream-error-detection.ts b/src/lib/utils/upstream-error-detection.ts index 88f1ffe0f..95fbf00c9 100644 --- a/src/lib/utils/upstream-error-detection.ts +++ b/src/lib/utils/upstream-error-detection.ts @@ -20,13 +20,15 @@ import { parseSSEData } from "@/lib/utils/sse"; * - 仅基于结构化字段做启发式判断:`error` 与 `message`; * - 不扫描模型生成的正文内容(例如 content/choices),避免把用户/模型自然语言里的 "error" 误判为上游错误; * - message 关键字检测仅对“小体积 JSON”启用,降低误判与性能开销。 - * - reason 字段会做脱敏与截断:避免把上游错误中可能包含的敏感信息写入日志/Redis/DB。 + * - 返回的 `code` 是语言无关的错误码(便于写入 DB/监控/告警); + * - 返回的 `detail`(如有)会做脱敏与截断:用于日志排查,但不建议直接作为用户展示文案。 */ export type UpstreamErrorDetectionResult = | { isError: false } | { isError: true; - reason: string; + code: string; + detail?: string; }; type DetectionOptions = { @@ -49,6 +51,13 @@ type DetectionOptions = { const DEFAULT_MAX_JSON_CHARS_FOR_MESSAGE_CHECK = 1000; const DEFAULT_MESSAGE_KEYWORD = /error/i; +const FAKE_200_CODES = { + EMPTY_BODY: "FAKE_200_EMPTY_BODY", + JSON_ERROR_NON_EMPTY: "FAKE_200_JSON_ERROR_NON_EMPTY", + JSON_ERROR_MESSAGE_NON_EMPTY: "FAKE_200_JSON_ERROR_MESSAGE_NON_EMPTY", + JSON_MESSAGE_KEYWORD_MATCH: "FAKE_200_JSON_MESSAGE_KEYWORD_MATCH", +} as const; + function isPlainRecord(value: unknown): value is Record { return !!value && typeof value === "object" && !Array.isArray(value); } @@ -68,7 +77,7 @@ function hasNonEmptyValue(value: unknown): boolean { return true; } -function sanitizeErrorTextForReason(text: string): string { +function sanitizeErrorTextForDetail(text: string): string { // 注意:这里的目的不是“完美脱敏”,而是尽量降低上游错误信息中意外夹带敏感内容的风险。 // 若后续发现更多敏感模式,可在不改变检测语义的前提下补充。 let sanitized = text; @@ -104,8 +113,8 @@ function sanitizeErrorTextForReason(text: string): string { return sanitized; } -function truncateForReason(text: string, maxLen: number = 200): string { - const trimmed = sanitizeErrorTextForReason(text).trim(); +function truncateForDetail(text: string, maxLen: number = 200): string { + const trimmed = sanitizeErrorTextForDetail(text).trim(); if (trimmed.length <= maxLen) return trimmed; return `${trimmed.slice(0, maxLen)}…`; } @@ -120,22 +129,24 @@ function detectFromJsonObject( // 2) 小体积 JSON 下,`message` 命中关键字:判定为错误(弱信号,但能覆盖部分“错误只写在 message”场景) const errorValue = obj.error; if (hasNonEmptyValue(errorValue)) { - // 优先展示 string 或 error.message,避免把整个对象塞进 reason + // 优先展示 string 或 error.message,避免把整个对象塞进 detail if (typeof errorValue === "string") { return { isError: true, - reason: `上游返回 200 但 JSON.error 非空: ${truncateForReason(errorValue)}`, + code: FAKE_200_CODES.JSON_ERROR_NON_EMPTY, + detail: truncateForDetail(errorValue), }; } if (isPlainRecord(errorValue) && typeof errorValue.message === "string") { return { isError: true, - reason: `上游返回 200 但 JSON.error.message 非空: ${truncateForReason(errorValue.message)}`, + code: FAKE_200_CODES.JSON_ERROR_MESSAGE_NON_EMPTY, + detail: truncateForDetail(errorValue.message), }; } - return { isError: true, reason: "上游返回 200 但 JSON.error 非空" }; + return { isError: true, code: FAKE_200_CODES.JSON_ERROR_NON_EMPTY }; } if (rawJsonChars < options.maxJsonCharsForMessageCheck) { @@ -145,7 +156,8 @@ function detectFromJsonObject( if (message && options.messageKeyword.test(message)) { return { isError: true, - reason: `上游返回 200 但 JSON.message 命中关键字: ${truncateForReason(message)}`, + code: FAKE_200_CODES.JSON_MESSAGE_KEYWORD_MATCH, + detail: truncateForDetail(message), }; } } @@ -176,7 +188,7 @@ export function detectUpstreamErrorFromSseOrJsonText( const trimmed = text.trim(); if (!trimmed) { - return { isError: true, reason: "上游返回 200 但响应体为空" }; + return { isError: true, code: FAKE_200_CODES.EMPTY_BODY }; } // 情况 1:纯 JSON(对象) From d47cc50b5d265f1b4164c167a6116b234729d0ec Mon Sep 17 00:00:00 2001 From: tesgth032 Date: Sun, 8 Feb 2026 12:23:20 +0800 Subject: [PATCH 08/13] =?UTF-8?q?fix(ui):=20=E5=81=87200=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E6=8F=90=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- messages/en/dashboard.json | 1 + messages/ja/dashboard.json | 1 + messages/ru/dashboard.json | 1 + messages/zh-CN/dashboard.json | 1 + messages/zh-TW/dashboard.json | 1 + .../_components/error-details-dialog.test.tsx | 16 ++++++++++++++++ .../components/SummaryTab.tsx | 10 ++++++++++ 7 files changed, 31 insertions(+) diff --git a/messages/en/dashboard.json b/messages/en/dashboard.json index be09c08b2..d30cd49d8 100644 --- a/messages/en/dashboard.json +++ b/messages/en/dashboard.json @@ -243,6 +243,7 @@ "billingRedirected": "billing: redirected" }, "errorMessage": "Error Message", + "fake200ForwardedNotice": "Note: For streaming requests, this failure may be detected only after the stream ends; the response content may already have been forwarded to the client.", "filteredProviders": "Filtered Providers", "providerChain": { "title": "Provider Decision Chain Timeline", diff --git a/messages/ja/dashboard.json b/messages/ja/dashboard.json index d330e3e7e..c73664cf5 100644 --- a/messages/ja/dashboard.json +++ b/messages/ja/dashboard.json @@ -243,6 +243,7 @@ "billingRedirected": "課金: 実際" }, "errorMessage": "エラーメッセージ", + "fake200ForwardedNotice": "注意:ストリーミング要求では、失敗判定がストリーム終了後になる場合があります。応答内容は既にクライアントへ転送されている可能性があります。", "filteredProviders": "フィルタされたプロバイダー", "providerChain": { "title": "プロバイダー決定チェーンタイムライン", diff --git a/messages/ru/dashboard.json b/messages/ru/dashboard.json index 54923b324..0a0dc47fc 100644 --- a/messages/ru/dashboard.json +++ b/messages/ru/dashboard.json @@ -243,6 +243,7 @@ "billingRedirected": "оплата: факт." }, "errorMessage": "Сообщение об ошибке", + "fake200ForwardedNotice": "Примечание: для потоковых запросов эта ошибка может быть обнаружена только после завершения потока; содержимое ответа могло уже быть передано клиенту.", "filteredProviders": "Отфильтрованные поставщики", "providerChain": { "title": "Хронология цепочки решений поставщика", diff --git a/messages/zh-CN/dashboard.json b/messages/zh-CN/dashboard.json index e83f55958..21dbf23c9 100644 --- a/messages/zh-CN/dashboard.json +++ b/messages/zh-CN/dashboard.json @@ -243,6 +243,7 @@ "billingRedirected": "计费: 实际" }, "errorMessage": "错误信息", + "fake200ForwardedNotice": "提示:对于流式请求,该失败可能在流结束后才被识别;响应内容可能已原样透传给客户端。", "filteredProviders": "被过滤的供应商", "providerChain": { "title": "供应商决策链时间线", diff --git a/messages/zh-TW/dashboard.json b/messages/zh-TW/dashboard.json index 37ed95570..75eda170c 100644 --- a/messages/zh-TW/dashboard.json +++ b/messages/zh-TW/dashboard.json @@ -243,6 +243,7 @@ "billingRedirected": "計費: 實際" }, "errorMessage": "錯誤訊息", + "fake200ForwardedNotice": "提示:對於串流請求,此失敗可能在串流結束後才被識別;回應內容可能已原樣透傳給用戶端。", "filteredProviders": "被過濾的供應商", "providerChain": { "title": "供應商決策鏈時間軸", diff --git a/src/app/[locale]/dashboard/logs/_components/error-details-dialog.test.tsx b/src/app/[locale]/dashboard/logs/_components/error-details-dialog.test.tsx index fd1f878b8..9cb749975 100644 --- a/src/app/[locale]/dashboard/logs/_components/error-details-dialog.test.tsx +++ b/src/app/[locale]/dashboard/logs/_components/error-details-dialog.test.tsx @@ -252,6 +252,7 @@ const messages = { default: "No error", }, errorMessage: "Error message", + fake200ForwardedNotice: "Note: detected after stream end; payload may have been forwarded", viewDetails: "View details", filteredProviders: "Filtered providers", providerChain: { @@ -325,6 +326,21 @@ function parseHtml(html: string) { } describe("error-details-dialog layout", () => { + test("renders fake-200 forwarded notice when errorMessage is a FAKE_200_* code", () => { + const html = renderWithIntl( + + ); + + expect(html).toContain("FAKE_200_EMPTY_BODY"); + expect(html).toContain("Note: detected after stream end; payload may have been forwarded"); + }); + test("renders special settings section when specialSettings exists", () => { const html = renderWithIntl( 0 ? JSON.stringify(specialSettings, null, 2) : null; + const isFake200PostStreamFailure = + typeof errorMessage === "string" && errorMessage.startsWith("FAKE_200_"); return (
@@ -423,6 +426,13 @@ export function SummaryTab({

{errorMessage.length > 200 ? `${errorMessage.slice(0, 200)}...` : errorMessage}

+ {/* 注意:假 200 检测发生在 SSE 流式结束后;此时内容已可能透传给客户端,因此需要提示用户避免误解。 */} + {isFake200PostStreamFailure && ( +
+
+ )} {errorMessage.length > 200 && onViewLogicTrace && (