From 084be30245e3b6dc7c591bf29173f729ee2ef7dd Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 12:11:38 +0800 Subject: [PATCH 01/10] feat(observability): integrate Langfuse for enterprise-grade LLM request tracing Add optional Langfuse observability integration that auto-enables when LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY env vars are set. Traces every proxy request with full context: provider chain, model, usage metrics, cost, TTFB, headers (auth redacted), special settings, and error details. Built on OpenTelemetry via @langfuse/otel with async non-blocking fire-and-forget semantics and zero overhead when disabled. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com> --- .env.example | 10 + package.json | 4 + src/app/v1/_lib/proxy-handler.ts | 26 +- src/instrumentation.ts | 19 ++ src/lib/config/env.schema.ts | 7 + src/lib/langfuse/index.ts | 92 ++++++ src/lib/langfuse/trace-proxy-request.ts | 218 +++++++++++++ tests/unit/langfuse/langfuse-trace.test.ts | 339 +++++++++++++++++++++ 8 files changed, 714 insertions(+), 1 deletion(-) create mode 100644 src/lib/langfuse/index.ts create mode 100644 src/lib/langfuse/trace-proxy-request.ts create mode 100644 tests/unit/langfuse/langfuse-trace.test.ts diff --git a/.env.example b/.env.example index 5e2c5031e..72ea5933c 100644 --- a/.env.example +++ b/.env.example @@ -128,6 +128,16 @@ FETCH_HEADERS_TIMEOUT=600000 FETCH_BODY_TIMEOUT=600000 MAX_RETRY_ATTEMPTS_DEFAULT=2 # 单供应商最大尝试次数(含首次调用),范围 1-10,留空使用默认值 2 +# Langfuse Observability (optional, auto-enabled when keys are set) +# 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期 +# - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用 +# - 支持 Langfuse Cloud 和自托管实例 +LANGFUSE_PUBLIC_KEY= # Langfuse project public key (pk-lf-...) +LANGFUSE_SECRET_KEY= # Langfuse project secret key (sk-lf-...) +LANGFUSE_BASE_URL=https://cloud.langfuse.com # Langfuse server URL (self-hosted or cloud) +LANGFUSE_SAMPLE_RATE=1.0 # Trace sampling rate (0.0-1.0, default: 1.0 = 100%) +LANGFUSE_DEBUG=false # Enable Langfuse debug logging + # 智能探测配置 # 功能说明:当熔断器处于 OPEN 状态时,定期探测供应商以实现更快恢复 # - ENABLE_SMART_PROBING:是否启用智能探测(默认:false) diff --git a/package.json b/package.json index 6411d827c..fb7d0914c 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,11 @@ "@hono/zod-openapi": "^1", "@hookform/resolvers": "^5", "@iarna/toml": "^2.2.5", + "@langfuse/client": "^4.6.1", + "@langfuse/otel": "^4.6.1", + "@langfuse/tracing": "^4.6.1", "@lobehub/icons": "^2", + "@opentelemetry/sdk-node": "^0.212.0", "@radix-ui/react-alert-dialog": "^1", "@radix-ui/react-avatar": "^1", "@radix-ui/react-checkbox": "^1", diff --git a/src/app/v1/_lib/proxy-handler.ts b/src/app/v1/_lib/proxy-handler.ts index 5257ef670..be610333f 100644 --- a/src/app/v1/_lib/proxy-handler.ts +++ b/src/app/v1/_lib/proxy-handler.ts @@ -80,7 +80,31 @@ export async function handleProxyRequest(c: Context): Promise { const response = await ProxyForwarder.send(session); const handled = await ProxyResponseHandler.dispatch(session, response); - return await attachSessionIdToErrorResponse(session.sessionId, handled); + const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled); + + // Fire Langfuse trace asynchronously (non-blocking) + if (process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY && session) { + const traceSession = session; + const traceStatusCode = handled.status; + const isSSE = (handled.headers.get("content-type") || "").includes("text/event-stream"); + void import("@/lib/langfuse/trace-proxy-request") + .then(({ traceProxyRequest }) => { + void traceProxyRequest({ + session: traceSession, + response: handled, + durationMs: Date.now() - traceSession.startTime, + statusCode: traceStatusCode, + isStreaming: isSSE, + }); + }) + .catch((err) => { + logger.warn("[ProxyHandler] Langfuse trace failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); + } + + return finalResponse; } catch (error) { logger.error("Proxy handler error:", error); if (session) { diff --git a/src/instrumentation.ts b/src/instrumentation.ts index a3303e51d..85ddfd3ba 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -140,6 +140,15 @@ function warmupApiKeyVacuumFilter(): void { export async function register() { // 仅在服务器端执行 if (process.env.NEXT_RUNTIME === "nodejs") { + // Initialize Langfuse observability (no-op if env vars not set) + try { + const { initLangfuse } = await import("@/lib/langfuse"); + await initLangfuse(); + } catch (error) { + logger.warn("[Instrumentation] Langfuse initialization failed (non-critical)", { + error: error instanceof Error ? error.message : String(error), + }); + } // Skip initialization in CI environment (no DB connection needed) if (process.env.CI === "true") { logger.warn( @@ -216,6 +225,16 @@ export async function register() { }); } + // Flush Langfuse pending spans + try { + const { shutdownLangfuse } = await import("@/lib/langfuse"); + await shutdownLangfuse(); + } catch (error) { + logger.warn("[Instrumentation] Failed to shutdown Langfuse", { + error: error instanceof Error ? error.message : String(error), + }); + } + // 尽力将 message_request 的异步批量更新刷入数据库(避免终止时丢失尾部日志) try { const { stopMessageRequestWriteBuffer } = await import( diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts index a845a0db5..b7dacd738 100644 --- a/src/lib/config/env.schema.ts +++ b/src/lib/config/env.schema.ts @@ -127,6 +127,13 @@ export const EnvSchema = z.object({ FETCH_BODY_TIMEOUT: z.coerce.number().default(600_000), // 请求/响应体传输超时(默认 600 秒) FETCH_HEADERS_TIMEOUT: z.coerce.number().default(600_000), // 响应头接收超时(默认 600 秒) FETCH_CONNECT_TIMEOUT: z.coerce.number().default(30000), // TCP 连接建立超时(默认 30 秒) + + // Langfuse Observability (optional, auto-enabled when keys are set) + LANGFUSE_PUBLIC_KEY: z.string().optional(), + LANGFUSE_SECRET_KEY: z.string().optional(), + LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"), + LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0), + LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform), }); /** diff --git a/src/lib/langfuse/index.ts b/src/lib/langfuse/index.ts new file mode 100644 index 000000000..56889ed36 --- /dev/null +++ b/src/lib/langfuse/index.ts @@ -0,0 +1,92 @@ +import type { LangfuseSpanProcessor } from "@langfuse/otel"; + +import type { NodeSDK } from "@opentelemetry/sdk-node"; +import { logger } from "@/lib/logger"; + +let sdk: NodeSDK | null = null; +let spanProcessor: LangfuseSpanProcessor | null = null; +let initialized = false; + +export function isLangfuseEnabled(): boolean { + return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); +} + +/** + * Initialize Langfuse OpenTelemetry SDK. + * Must be called early in the process (instrumentation.ts register()). + * No-op if LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY are not set. + */ +export async function initLangfuse(): Promise { + if (initialized || !isLangfuseEnabled()) { + return; + } + + try { + const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node"); + const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel"); + + const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0"); + + spanProcessor = new LfSpanProcessor({ + publicKey: process.env.LANGFUSE_PUBLIC_KEY, + secretKey: process.env.LANGFUSE_SECRET_KEY, + baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", + // Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations) + shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk", + }); + + const samplerConfig = + sampleRate < 1.0 + ? await (async () => { + const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base"); + return { sampler: new TraceIdRatioBasedSampler(sampleRate) }; + })() + : {}; + + sdk = new OtelNodeSDK({ + spanProcessors: [spanProcessor], + ...samplerConfig, + }); + + sdk.start(); + initialized = true; + + logger.info("[Langfuse] Observability initialized", { + baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com", + sampleRate, + debug: process.env.LANGFUSE_DEBUG === "true", + }); + + if (process.env.LANGFUSE_DEBUG === "true") { + const { configureGlobalLogger, LogLevel } = await import("@langfuse/core"); + configureGlobalLogger({ level: LogLevel.DEBUG }); + } + } catch (error) { + logger.error("[Langfuse] Failed to initialize", { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +/** + * Flush pending spans and shut down the SDK. + * Called during graceful shutdown (SIGTERM/SIGINT). + */ +export async function shutdownLangfuse(): Promise { + if (!initialized || !spanProcessor) { + return; + } + + try { + await spanProcessor.forceFlush(); + if (sdk) { + await sdk.shutdown(); + } + initialized = false; + logger.info("[Langfuse] Shutdown complete"); + } catch (error) { + logger.warn("[Langfuse] Shutdown error", { + error: error instanceof Error ? error.message : String(error), + }); + } +} diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts new file mode 100644 index 000000000..a7fccde55 --- /dev/null +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -0,0 +1,218 @@ +import type { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { isLangfuseEnabled } from "@/lib/langfuse/index"; +import { logger } from "@/lib/logger"; + +// Auth-sensitive header names to redact +const REDACTED_HEADERS = new Set([ + "x-api-key", + "authorization", + "x-goog-api-key", + "anthropic-api-key", + "cookie", + "set-cookie", +]); + +function sanitizeHeaders(headers: Headers): Record { + const result: Record = {}; + headers.forEach((value, key) => { + result[key] = REDACTED_HEADERS.has(key.toLowerCase()) ? "[REDACTED]" : value; + }); + return result; +} + +function buildRequestBodySummary(session: ProxySession): Record { + const msg = session.request.message as Record; + return { + model: session.request.model, + messageCount: session.getMessagesLength(), + hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0, + toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0, + stream: msg.stream === true, + maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined, + temperature: typeof msg.temperature === "number" ? msg.temperature : undefined, + }; +} + +function getStatusCategory(statusCode: number): string { + if (statusCode >= 200 && statusCode < 300) return "2xx"; + if (statusCode >= 400 && statusCode < 500) return "4xx"; + if (statusCode >= 500) return "5xx"; + return `${Math.floor(statusCode / 100)}xx`; +} + +export interface TraceContext { + session: ProxySession; + response: Response; + durationMs: number; + statusCode: number; + responseText?: string; + isStreaming: boolean; + sseEventCount?: number; + errorMessage?: string; + usageMetrics?: { + input_tokens?: number; + output_tokens?: number; + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; + } | null; + costUsd?: string; +} + +/** + * Send a trace to Langfuse for a completed proxy request. + * Fully async and non-blocking. Errors are caught and logged. + */ +export async function traceProxyRequest(ctx: TraceContext): Promise { + if (!isLangfuseEnabled()) { + return; + } + + try { + const { startObservation, propagateAttributes } = await import("@langfuse/tracing"); + + const { session, response, durationMs, statusCode, isStreaming } = ctx; + const provider = session.provider; + const messageContext = session.messageContext; + + // Build tags + const tags: string[] = []; + if (provider?.providerType) tags.push(provider.providerType); + if (session.originalFormat) tags.push(session.originalFormat); + tags.push(getStatusCategory(statusCode)); + + // Build trace-level metadata (propagateAttributes requires all values to be strings) + const traceMetadata: Record = { + keyName: messageContext?.key?.name ?? "", + endpoint: session.getEndpoint() ?? "", + method: session.method, + clientFormat: session.originalFormat, + userAgent: session.userAgent ?? "", + requestSequence: String(session.getRequestSequence()), + }; + + // Build generation metadata + const generationMetadata: Record = { + providerId: provider?.id, + providerName: provider?.name, + providerType: provider?.providerType, + providerChain: session.getProviderChain(), + specialSettings: session.getSpecialSettings(), + modelRedirected: session.isModelRedirected(), + originalModel: session.isModelRedirected() ? session.getOriginalModel() : undefined, + isStreaming, + statusCode, + durationMs, + ttfbMs: session.ttfbMs, + cacheTtlApplied: session.getCacheTtlResolved(), + context1mApplied: session.getContext1mApplied(), + errorMessage: ctx.errorMessage, + requestHeaders: sanitizeHeaders(session.headers), + responseHeaders: sanitizeHeaders(response.headers), + requestBodySummary: buildRequestBodySummary(session), + }; + + // Add response body summary + if (isStreaming) { + generationMetadata.sseEventCount = ctx.sseEventCount; + } else if (ctx.responseText) { + generationMetadata.responseBodySummary = + ctx.responseText.length > 2000 + ? `${ctx.responseText.substring(0, 2000)}...[truncated]` + : ctx.responseText; + } + + // Build usage details for Langfuse generation + const usageDetails: Record | undefined = ctx.usageMetrics + ? { + ...(ctx.usageMetrics.input_tokens != null + ? { input: ctx.usageMetrics.input_tokens } + : {}), + ...(ctx.usageMetrics.output_tokens != null + ? { output: ctx.usageMetrics.output_tokens } + : {}), + ...(ctx.usageMetrics.cache_read_input_tokens != null + ? { cacheRead: ctx.usageMetrics.cache_read_input_tokens } + : {}), + ...(ctx.usageMetrics.cache_creation_input_tokens != null + ? { cacheCreation: ctx.usageMetrics.cache_creation_input_tokens } + : {}), + } + : undefined; + + // Build cost details + const costDetails: Record | undefined = + ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0 + ? { totalUsd: Number.parseFloat(ctx.costUsd) } + : undefined; + + // Create the root trace span + const rootSpan = startObservation("proxy-request", { + input: traceMetadata, + output: { + statusCode, + durationMs, + model: session.getCurrentModel(), + hasUsage: !!ctx.usageMetrics, + }, + }); + + // Propagate trace attributes + await propagateAttributes( + { + userId: messageContext?.user?.id ? String(messageContext.user.id) : undefined, + sessionId: session.sessionId ?? undefined, + tags, + metadata: traceMetadata, + traceName: `${session.method} ${session.getEndpoint() ?? "/"}`, + }, + async () => { + // Create the LLM generation observation + const generation = rootSpan.startObservation( + "llm-call", + { + model: session.getCurrentModel() ?? undefined, + input: buildRequestBodySummary(session), + output: isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : ctx.responseText + ? tryParseJsonSafe(ctx.responseText) + : { statusCode }, + ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}), + ...(costDetails ? { costDetails } : {}), + metadata: generationMetadata, + }, + { asType: "generation" } + ); + + // Set TTFB as completionStartTime + if (session.ttfbMs != null) { + generation.update({ + completionStartTime: new Date(session.startTime + session.ttfbMs), + }); + } + + generation.end(); + } + ); + + rootSpan.end(); + } catch (error) { + logger.warn("[Langfuse] Failed to trace proxy request", { + error: error instanceof Error ? error.message : String(error), + }); + } +} + +function tryParseJsonSafe(text: string): unknown { + try { + const parsed = JSON.parse(text); + // Truncate large outputs to avoid excessive data + const str = JSON.stringify(parsed); + if (str.length > 4000) { + return { _truncated: true, _length: str.length, _preview: str.substring(0, 2000) }; + } + return parsed; + } catch { + return text.length > 2000 ? `${text.substring(0, 2000)}...[truncated]` : text; + } +} diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts new file mode 100644 index 000000000..c55936569 --- /dev/null +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -0,0 +1,339 @@ +import { describe, expect, test, vi, beforeEach, afterEach } from "vitest"; + +// Mock the langfuse modules at the top level +const mockStartObservation = vi.fn(); +const mockPropagateAttributes = vi.fn(); +const mockSpanEnd = vi.fn(); +const mockGenerationEnd = vi.fn(); +const mockGenerationUpdate = vi.fn(); + +const mockGeneration: any = { + update: (...args: unknown[]) => { + mockGenerationUpdate(...args); + return mockGeneration; + }, + end: mockGenerationEnd, +}; + +const mockRootSpan = { + startObservation: vi.fn().mockReturnValue(mockGeneration), + end: mockSpanEnd, +}; + +vi.mock("@langfuse/tracing", () => ({ + startObservation: (...args: unknown[]) => { + mockStartObservation(...args); + return mockRootSpan; + }, + propagateAttributes: async (attrs: unknown, fn: () => Promise) => { + mockPropagateAttributes(attrs); + await fn(); + }, +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, +})); + +let langfuseEnabled = true; +vi.mock("@/lib/langfuse/index", () => ({ + isLangfuseEnabled: () => langfuseEnabled, +})); + +function createMockSession(overrides: Record = {}) { + return { + startTime: Date.now() - 500, + method: "POST", + headers: new Headers({ + "content-type": "application/json", + "x-api-key": "test-mock-key-not-real", + "user-agent": "claude-code/1.0", + }), + request: { + message: { + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "Hello" }], + stream: true, + max_tokens: 4096, + tools: [{ name: "tool1" }], + }, + model: "claude-sonnet-4-20250514", + }, + originalFormat: "claude", + userAgent: "claude-code/1.0", + sessionId: "sess_abc12345_def67890", + provider: { + id: 1, + name: "anthropic-main", + providerType: "claude", + }, + messageContext: { + id: 42, + user: { id: 7, name: "testuser" }, + key: { name: "default-key" }, + }, + ttfbMs: 200, + getEndpoint: () => "/v1/messages", + getRequestSequence: () => 3, + getMessagesLength: () => 1, + getCurrentModel: () => "claude-sonnet-4-20250514", + getOriginalModel: () => "claude-sonnet-4-20250514", + isModelRedirected: () => false, + getProviderChain: () => [ + { + id: 1, + name: "anthropic-main", + providerType: "claude", + reason: "initial_selection", + timestamp: Date.now(), + }, + ], + getSpecialSettings: () => null, + getCacheTtlResolved: () => null, + getContext1mApplied: () => false, + ...overrides, + } as any; +} + +describe("traceProxyRequest", () => { + beforeEach(() => { + vi.clearAllMocks(); + langfuseEnabled = true; + // Re-setup return values after clearAllMocks + mockRootSpan.startObservation.mockReturnValue(mockGeneration); + }); + + test("should not trace when Langfuse is disabled", async () => { + langfuseEnabled = false; + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockStartObservation).not.toHaveBeenCalled(); + }); + + test("should trace when Langfuse is enabled", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockStartObservation).toHaveBeenCalledWith( + "proxy-request", + expect.objectContaining({ + input: expect.objectContaining({ + endpoint: "/v1/messages", + method: "POST", + clientFormat: "claude", + }), + }) + ); + + expect(mockRootSpan.startObservation).toHaveBeenCalledWith( + "llm-call", + expect.objectContaining({ + model: "claude-sonnet-4-20250514", + }), + { asType: "generation" } + ); + + expect(mockSpanEnd).toHaveBeenCalled(); + expect(mockGenerationEnd).toHaveBeenCalled(); + }); + + test("should redact sensitive headers", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + const metadata = generationCall[1].metadata; + expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]"); + expect(metadata.requestHeaders["content-type"]).toBe("application/json"); + }); + + test("should propagate userId and sessionId", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockPropagateAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + userId: "7", + sessionId: "sess_abc12345_def67890", + tags: expect.arrayContaining(["claude", "2xx"]), + }) + ); + }); + + test("should include usage details when provided", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + usageMetrics: { + input_tokens: 100, + output_tokens: 50, + cache_read_input_tokens: 20, + }, + costUsd: "0.0015", + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + expect(generationCall[1].usageDetails).toEqual({ + input: 100, + output: 50, + cacheRead: 20, + }); + expect(generationCall[1].costDetails).toEqual({ + totalUsd: 0.0015, + }); + }); + + test("should handle model redirect metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + isModelRedirected: () => true, + getOriginalModel: () => "claude-sonnet-4-20250514", + getCurrentModel: () => "glm-4", + request: { + message: { model: "glm-4", messages: [] }, + model: "glm-4", + }, + }), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + expect(generationCall[1].metadata.modelRedirected).toBe(true); + expect(generationCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514"); + }); + + test("should set completionStartTime from ttfbMs", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = Date.now() - 500; + await traceProxyRequest({ + session: createMockSession({ startTime, ttfbMs: 200 }), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + expect(mockGenerationUpdate).toHaveBeenCalledWith({ + completionStartTime: new Date(startTime + 200), + }); + }); + + test("should handle errors gracefully without throwing", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + // Make startObservation throw + mockStartObservation.mockImplementationOnce(() => { + throw new Error("SDK error"); + }); + + await expect( + traceProxyRequest({ + session: createMockSession(), + response: new Response("ok", { status: 200 }), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }) + ).resolves.toBeUndefined(); + }); + + test("should include correct tags for error responses", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + response: new Response("error", { status: 502 }), + durationMs: 500, + statusCode: 502, + isStreaming: false, + errorMessage: "upstream error", + }); + + expect(mockPropagateAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + tags: expect.arrayContaining(["5xx"]), + }) + ); + }); +}); + +describe("isLangfuseEnabled", () => { + const originalPublicKey = process.env.LANGFUSE_PUBLIC_KEY; + const originalSecretKey = process.env.LANGFUSE_SECRET_KEY; + + afterEach(() => { + // Restore env + if (originalPublicKey !== undefined) { + process.env.LANGFUSE_PUBLIC_KEY = originalPublicKey; + } else { + delete process.env.LANGFUSE_PUBLIC_KEY; + } + if (originalSecretKey !== undefined) { + process.env.LANGFUSE_SECRET_KEY = originalSecretKey; + } else { + delete process.env.LANGFUSE_SECRET_KEY; + } + }); + + test("should return false when env vars are not set", () => { + delete process.env.LANGFUSE_PUBLIC_KEY; + delete process.env.LANGFUSE_SECRET_KEY; + + // Direct function test (not using the mock) + const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); + expect(isEnabled).toBe(false); + }); + + test("should return true when both keys are set", () => { + process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock"; + process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real"; + + const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY); + expect(isEnabled).toBe(true); + }); +}); From 3251c2fe9a579b78b59d6e5a71dffd2ac63a5129 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 13:11:28 +0800 Subject: [PATCH 02/10] fix(observability): move Langfuse trace from proxy-handler to response-handler for complete data The previous Langfuse integration fired traces in proxy-handler.ts immediately after dispatch() returned. For streaming responses, the response body hadn't been consumed yet, so responseText, usageMetrics, costUsd, sseEventCount, and errorMessage were all undefined. Move trace invocation into response-handler.ts at the exact points where ALL data has been collected: - Non-streaming standard path (after response body parsed + cost calculated) - Non-streaming Gemini passthrough (after finalizeRequestStats) - Streaming standard path (end of finalizeStream) - Streaming Gemini passthrough (after finalizeRequestStats) - Error/abort paths (inside persistRequestFailure) Also enhance the trace quality: - Generation input: actual request messages instead of summary object - Generation output: actual parsed response body instead of empty/status - Tags: add provider.name and model alongside providerType - Metadata: add model, originalModel, endpoint, sessionId, keyName, requestSequence, sseEventCount, and requestSummary - Root span output: include costUsd - Add truncateForLangfuse() helper (configurable via LANGFUSE_MAX_IO_SIZE) - TraceContext: replace response:Response with responseHeaders:Headers - Import UsageMetrics type from response-handler --- src/app/v1/_lib/proxy-handler.ts | 22 --- src/app/v1/_lib/proxy/response-handler.ts | 120 +++++++++++++- src/lib/langfuse/trace-proxy-request.ts | 118 +++++++++----- tests/unit/langfuse/langfuse-trace.test.ts | 172 +++++++++++++++++++-- 4 files changed, 350 insertions(+), 82 deletions(-) diff --git a/src/app/v1/_lib/proxy-handler.ts b/src/app/v1/_lib/proxy-handler.ts index be610333f..6c041e390 100644 --- a/src/app/v1/_lib/proxy-handler.ts +++ b/src/app/v1/_lib/proxy-handler.ts @@ -82,28 +82,6 @@ export async function handleProxyRequest(c: Context): Promise { const handled = await ProxyResponseHandler.dispatch(session, response); const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled); - // Fire Langfuse trace asynchronously (non-blocking) - if (process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY && session) { - const traceSession = session; - const traceStatusCode = handled.status; - const isSSE = (handled.headers.get("content-type") || "").includes("text/event-stream"); - void import("@/lib/langfuse/trace-proxy-request") - .then(({ traceProxyRequest }) => { - void traceProxyRequest({ - session: traceSession, - response: handled, - durationMs: Date.now() - traceSession.startTime, - statusCode: traceStatusCode, - isStreaming: isSSE, - }); - }) - .catch((err) => { - logger.warn("[ProxyHandler] Langfuse trace failed", { - error: err instanceof Error ? err.message : String(err), - }); - }); - } - return finalResponse; } catch (error) { logger.error("Proxy handler error:", error); diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index c2afe9d90..d1837029d 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -39,6 +39,47 @@ export type UsageMetrics = { output_image_tokens?: number; }; +/** + * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant. + */ +function emitLangfuseTrace( + session: ProxySession, + data: { + responseHeaders: Headers; + responseText: string; + usageMetrics: UsageMetrics | null; + costUsd: string | undefined; + statusCode: number; + durationMs: number; + isStreaming: boolean; + sseEventCount?: number; + errorMessage?: string; + } +): void { + if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return; + + void import("@/lib/langfuse/trace-proxy-request") + .then(({ traceProxyRequest }) => { + void traceProxyRequest({ + session, + responseHeaders: data.responseHeaders, + durationMs: data.durationMs, + statusCode: data.statusCode, + isStreaming: data.isStreaming, + responseText: data.responseText, + usageMetrics: data.usageMetrics, + costUsd: data.costUsd, + sseEventCount: data.sseEventCount, + errorMessage: data.errorMessage, + }); + }) + .catch((err) => { + logger.warn("[ResponseHandler] Langfuse trace failed", { + error: err instanceof Error ? err.message : String(err), + }); + }); +} + /** * 清理 Response headers 中的传输相关 header * @@ -520,6 +561,18 @@ export class ProxyResponseHandler { duration, errorMessageForFinalize ); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText, + usageMetrics: parseUsageFromResponseText(responseText, provider.providerType) + .usageMetrics, + costUsd: undefined, + statusCode, + durationMs: duration, + isStreaming: false, + errorMessage: errorMessageForFinalize, + }); } catch (error) { if (!isClientAbortError(error as Error)) { logger.error( @@ -687,10 +740,9 @@ export class ProxyResponseHandler { await trackCostToRedis(session, usageMetrics); } - // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId && usageMetrics) { - // 计算成本(复用相同逻辑) - let costUsdStr: string | undefined; + // Calculate cost (for session tracking and Langfuse tracing) + let costUsdStr: string | undefined; + if (usageMetrics) { try { if (session.request.model) { const priceData = await session.getCachedPriceDataByBillingSource(); @@ -711,7 +763,10 @@ export class ProxyResponseHandler { error: error instanceof Error ? error.message : String(error), }); } + } + // 更新 session 使用量到 Redis(用于实时监控) + if (session.sessionId && usageMetrics) { void SessionManager.updateSessionUsage(session.sessionId, { inputTokens: usageMetrics.input_tokens, outputTokens: usageMetrics.output_tokens, @@ -782,6 +837,16 @@ export class ProxyResponseHandler { providerName: provider.name, statusCode, }); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText, + usageMetrics, + costUsd: costUsdStr, + statusCode, + durationMs: Date.now() - session.startTime, + isStreaming: false, + }); } catch (error) { // 检测 AbortError 的来源:响应超时 vs 客户端中断 const err = error as Error; @@ -1220,6 +1285,18 @@ export class ProxyResponseHandler { finalized.errorMessage ?? undefined, finalized.providerIdForPersistence ?? undefined ); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText: allContent, + usageMetrics: parseUsageFromResponseText(allContent, provider.providerType) + .usageMetrics, + costUsd: undefined, + statusCode: finalized.effectiveStatusCode, + durationMs: duration, + isStreaming: true, + errorMessage: finalized.errorMessage ?? undefined, + }); } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); const clientAborted = session.clientAbortSignal?.aborted ?? false; @@ -1588,11 +1665,11 @@ export class ProxyResponseHandler { // 追踪消费到 Redis(用于限流) await trackCostToRedis(session, usageForCost); - // 更新 session 使用量到 Redis(用于实时监控) - if (session.sessionId) { - let costUsdStr: string | undefined; + // Calculate cost (for session tracking and Langfuse tracing) + let costUsdStr: string | undefined; + if (usageForCost) { try { - if (usageForCost && session.request.model) { + if (session.request.model) { const priceData = await session.getCachedPriceDataByBillingSource(); if (priceData) { const cost = calculateRequestCost( @@ -1611,7 +1688,10 @@ export class ProxyResponseHandler { error: error instanceof Error ? error.message : String(error), }); } + } + // 更新 session 使用量到 Redis(用于实时监控) + if (session.sessionId) { const payload: SessionUsageUpdate = { status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error", statusCode: effectiveStatusCode, @@ -1650,6 +1730,18 @@ export class ProxyResponseHandler { providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后) context1mApplied: session.getContext1mApplied(), }); + + emitLangfuseTrace(session, { + responseHeaders: response.headers, + responseText: allContent, + usageMetrics: usageForCost, + costUsd: costUsdStr, + statusCode: effectiveStatusCode, + durationMs: duration, + isStreaming: true, + sseEventCount: chunks.length, + errorMessage: streamErrorMessage ?? undefined, + }); }; try { @@ -2919,6 +3011,18 @@ async function persistRequestFailure(options: { }); } } + + // Emit Langfuse trace for error/abort paths + emitLangfuseTrace(session, { + responseHeaders: new Headers(), + responseText: "", + usageMetrics: null, + costUsd: undefined, + statusCode, + durationMs: duration, + isStreaming: phase === "stream", + errorMessage, + }); } /** diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index a7fccde55..e5bd0b87a 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -1,3 +1,4 @@ +import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler"; import type { ProxySession } from "@/app/v1/_lib/proxy/session"; import { isLangfuseEnabled } from "@/lib/langfuse/index"; import { logger } from "@/lib/logger"; @@ -40,21 +41,39 @@ function getStatusCategory(statusCode: number): string { return `${Math.floor(statusCode / 100)}xx`; } +const LANGFUSE_MAX_IO_SIZE = Number(process.env.LANGFUSE_MAX_IO_SIZE) || 100_000; + +/** + * Truncate data for Langfuse to avoid excessive payload sizes. + */ +function truncateForLangfuse(data: unknown, maxChars: number = LANGFUSE_MAX_IO_SIZE): unknown { + if (typeof data === "string") { + return data.length > maxChars ? `${data.substring(0, maxChars)}...[truncated]` : data; + } + if (data != null && typeof data === "object") { + const str = JSON.stringify(data); + if (str.length > maxChars) { + return { + _truncated: true, + _length: str.length, + _preview: str.substring(0, Math.min(maxChars, 2000)), + }; + } + return data; + } + return data; +} + export interface TraceContext { session: ProxySession; - response: Response; + responseHeaders: Headers; durationMs: number; statusCode: number; responseText?: string; isStreaming: boolean; sseEventCount?: number; errorMessage?: string; - usageMetrics?: { - input_tokens?: number; - output_tokens?: number; - cache_creation_input_tokens?: number; - cache_read_input_tokens?: number; - } | null; + usageMetrics?: UsageMetrics | null; costUsd?: string; } @@ -70,14 +89,16 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { try { const { startObservation, propagateAttributes } = await import("@langfuse/tracing"); - const { session, response, durationMs, statusCode, isStreaming } = ctx; + const { session, durationMs, statusCode, isStreaming } = ctx; const provider = session.provider; const messageContext = session.messageContext; - // Build tags + // Build tags - include provider name and model const tags: string[] = []; if (provider?.providerType) tags.push(provider.providerType); + if (provider?.name) tags.push(provider.name); if (session.originalFormat) tags.push(session.originalFormat); + if (session.getCurrentModel()) tags.push(session.getCurrentModel()!); tags.push(getStatusCategory(statusCode)); // Build trace-level metadata (propagateAttributes requires all values to be strings) @@ -90,37 +111,45 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { requestSequence: String(session.getRequestSequence()), }; - // Build generation metadata + // Build generation metadata - all request detail fields const generationMetadata: Record = { + // Provider providerId: provider?.id, providerName: provider?.name, providerType: provider?.providerType, providerChain: session.getProviderChain(), - specialSettings: session.getSpecialSettings(), + // Model + model: session.getCurrentModel(), + originalModel: session.getOriginalModel(), modelRedirected: session.isModelRedirected(), - originalModel: session.isModelRedirected() ? session.getOriginalModel() : undefined, - isStreaming, - statusCode, + // Special settings + specialSettings: session.getSpecialSettings(), + // Request context + endpoint: session.getEndpoint(), + method: session.method, + clientFormat: session.originalFormat, + userAgent: session.userAgent, + requestSequence: session.getRequestSequence(), + sessionId: session.sessionId, + keyName: messageContext?.key?.name, + // Timing durationMs, ttfbMs: session.ttfbMs, + // Flags + isStreaming, cacheTtlApplied: session.getCacheTtlResolved(), context1mApplied: session.getContext1mApplied(), + // Error errorMessage: ctx.errorMessage, + // Request summary (quick overview) + requestSummary: buildRequestBodySummary(session), + // SSE + sseEventCount: ctx.sseEventCount, + // Headers (sanitized) requestHeaders: sanitizeHeaders(session.headers), - responseHeaders: sanitizeHeaders(response.headers), - requestBodySummary: buildRequestBodySummary(session), + responseHeaders: sanitizeHeaders(ctx.responseHeaders), }; - // Add response body summary - if (isStreaming) { - generationMetadata.sseEventCount = ctx.sseEventCount; - } else if (ctx.responseText) { - generationMetadata.responseBodySummary = - ctx.responseText.length > 2000 - ? `${ctx.responseText.substring(0, 2000)}...[truncated]` - : ctx.responseText; - } - // Build usage details for Langfuse generation const usageDetails: Record | undefined = ctx.usageMetrics ? { @@ -147,12 +176,19 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Create the root trace span const rootSpan = startObservation("proxy-request", { - input: traceMetadata, + input: { + endpoint: session.getEndpoint(), + method: session.method, + model: session.getCurrentModel(), + clientFormat: session.originalFormat, + providerName: provider?.name, + }, output: { statusCode, durationMs, model: session.getCurrentModel(), hasUsage: !!ctx.usageMetrics, + costUsd: ctx.costUsd, }, }); @@ -166,17 +202,23 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { traceName: `${session.method} ${session.getEndpoint() ?? "/"}`, }, async () => { + // Generation input = actual request payload + const generationInput = truncateForLangfuse(session.request.message); + + // Generation output = actual response body + const generationOutput = ctx.responseText + ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText)) + : isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : { statusCode }; + // Create the LLM generation observation const generation = rootSpan.startObservation( "llm-call", { model: session.getCurrentModel() ?? undefined, - input: buildRequestBodySummary(session), - output: isStreaming - ? { streaming: true, sseEventCount: ctx.sseEventCount } - : ctx.responseText - ? tryParseJsonSafe(ctx.responseText) - : { statusCode }, + input: generationInput, + output: generationOutput, ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}), ...(costDetails ? { costDetails } : {}), metadata: generationMetadata, @@ -205,14 +247,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { function tryParseJsonSafe(text: string): unknown { try { - const parsed = JSON.parse(text); - // Truncate large outputs to avoid excessive data - const str = JSON.stringify(parsed); - if (str.length > 4000) { - return { _truncated: true, _length: str.length, _preview: str.substring(0, 2000) }; - } - return parsed; + return JSON.parse(text); } catch { - return text.length > 2000 ? `${text.substring(0, 2000)}...[truncated]` : text; + return text; } } diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index c55936569..8ba5fcad6 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -114,7 +114,7 @@ describe("traceProxyRequest", () => { await traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -128,10 +128,11 @@ describe("traceProxyRequest", () => { await traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers({ "content-type": "application/json" }), durationMs: 500, statusCode: 200, isStreaming: false, + responseText: '{"content": "Hi there"}', }); expect(mockStartObservation).toHaveBeenCalledWith( @@ -141,6 +142,12 @@ describe("traceProxyRequest", () => { endpoint: "/v1/messages", method: "POST", clientFormat: "claude", + providerName: "anthropic-main", + }), + output: expect.objectContaining({ + statusCode: 200, + durationMs: 500, + costUsd: undefined, }), }) ); @@ -157,12 +164,47 @@ describe("traceProxyRequest", () => { expect(mockGenerationEnd).toHaveBeenCalled(); }); + test("should use actual request messages as generation input", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const session = createMockSession(); + + await traceProxyRequest({ + session, + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: '{"content": "response"}', + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + // Generation input should be the actual request message, not a summary + expect(generationCall[1].input).toEqual(session.request.message); + }); + + test("should use actual response body as generation output", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { content: [{ type: "text", text: "Hello!" }] }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: JSON.stringify(responseBody), + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + expect(generationCall[1].output).toEqual(responseBody); + }); + test("should redact sensitive headers", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); await traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers({ "x-api-key": "secret-mock" }), durationMs: 500, statusCode: 200, isStreaming: false, @@ -172,14 +214,15 @@ describe("traceProxyRequest", () => { const metadata = generationCall[1].metadata; expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]"); expect(metadata.requestHeaders["content-type"]).toBe("application/json"); + expect(metadata.responseHeaders["x-api-key"]).toBe("[REDACTED]"); }); - test("should propagate userId and sessionId", async () => { + test("should include provider name and model in tags", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); await traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -189,7 +232,12 @@ describe("traceProxyRequest", () => { expect.objectContaining({ userId: "7", sessionId: "sess_abc12345_def67890", - tags: expect.arrayContaining(["claude", "2xx"]), + tags: expect.arrayContaining([ + "claude", + "anthropic-main", + "claude-sonnet-4-20250514", + "2xx", + ]), }) ); }); @@ -199,7 +247,7 @@ describe("traceProxyRequest", () => { await traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -222,6 +270,45 @@ describe("traceProxyRequest", () => { }); }); + test("should include providerChain, specialSettings, and model in metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const providerChain = [ + { + id: 1, + name: "anthropic-main", + providerType: "claude", + reason: "initial_selection", + timestamp: Date.now(), + }, + ]; + + await traceProxyRequest({ + session: createMockSession({ + getSpecialSettings: () => ({ maxThinking: 8192 }), + getProviderChain: () => providerChain, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + const metadata = generationCall[1].metadata; + expect(metadata.providerChain).toEqual(providerChain); + expect(metadata.specialSettings).toEqual({ maxThinking: 8192 }); + expect(metadata.model).toBe("claude-sonnet-4-20250514"); + expect(metadata.originalModel).toBe("claude-sonnet-4-20250514"); + expect(metadata.providerName).toBe("anthropic-main"); + expect(metadata.requestSummary).toEqual( + expect.objectContaining({ + model: "claude-sonnet-4-20250514", + messageCount: 1, + }) + ); + }); + test("should handle model redirect metadata", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); @@ -235,7 +322,7 @@ describe("traceProxyRequest", () => { model: "glm-4", }, }), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -252,7 +339,7 @@ describe("traceProxyRequest", () => { const startTime = Date.now() - 500; await traceProxyRequest({ session: createMockSession({ startTime, ttfbMs: 200 }), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -274,7 +361,7 @@ describe("traceProxyRequest", () => { await expect( traceProxyRequest({ session: createMockSession(), - response: new Response("ok", { status: 200 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 200, isStreaming: false, @@ -287,7 +374,7 @@ describe("traceProxyRequest", () => { await traceProxyRequest({ session: createMockSession(), - response: new Response("error", { status: 502 }), + responseHeaders: new Headers(), durationMs: 500, statusCode: 502, isStreaming: false, @@ -300,6 +387,69 @@ describe("traceProxyRequest", () => { }) ); }); + + test("should truncate large input/output for Langfuse", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + // Generate a large response text (> default 100KB limit) + const largeContent = "x".repeat(200_000); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: largeContent, + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + const output = generationCall[1].output as string; + // Non-JSON text should be truncated + expect(output.length).toBeLessThan(200_000); + expect(output).toContain("...[truncated]"); + }); + + test("should show streaming output with sseEventCount when no responseText", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: true, + sseEventCount: 42, + }); + + const generationCall = mockRootSpan.startObservation.mock.calls[0]; + expect(generationCall[1].output).toEqual({ + streaming: true, + sseEventCount: 42, + }); + }); + + test("should include costUsd in root span output", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + expect(mockStartObservation).toHaveBeenCalledWith( + "proxy-request", + expect.objectContaining({ + output: expect.objectContaining({ + costUsd: "0.05", + }), + }) + ); + }); }); describe("isLangfuseEnabled", () => { From 69fc61b975a5d9c2f95f565f89a022d6183e6869 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 13:29:16 +0800 Subject: [PATCH 03/10] fix(langfuse): set trace-level input/output via updateTrace propagateAttributes() does not support input/output fields, and the root span's input/output do not auto-inherit to the trace level. This caused Langfuse UI to show Input: undefined and Output: undefined. Add explicit rootSpan.updateTrace() call to set trace-level input/output per Langfuse SDK documentation (Solution B: set input/output directly on the trace). --- src/lib/langfuse/trace-proxy-request.ts | 18 +++++++++++++ tests/unit/langfuse/langfuse-trace.test.ts | 30 ++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index e5bd0b87a..37ff0d140 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -237,6 +237,24 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { } ); + // Explicitly set trace-level input/output (propagateAttributes does not support these) + rootSpan.updateTrace({ + input: { + endpoint: session.getEndpoint(), + method: session.method, + model: session.getCurrentModel(), + clientFormat: session.originalFormat, + providerName: provider?.name, + }, + output: { + statusCode, + durationMs, + model: session.getCurrentModel(), + hasUsage: !!ctx.usageMetrics, + costUsd: ctx.costUsd, + }, + }); + rootSpan.end(); } catch (error) { logger.warn("[Langfuse] Failed to trace proxy request", { diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 8ba5fcad6..4810c9f10 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -15,8 +15,11 @@ const mockGeneration: any = { end: mockGenerationEnd, }; +const mockUpdateTrace = vi.fn(); + const mockRootSpan = { startObservation: vi.fn().mockReturnValue(mockGeneration), + updateTrace: mockUpdateTrace, end: mockSpanEnd, }; @@ -450,6 +453,33 @@ describe("traceProxyRequest", () => { }) ); }); + test("should set trace-level input/output via updateTrace", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + expect(mockUpdateTrace).toHaveBeenCalledWith({ + input: expect.objectContaining({ + endpoint: "/v1/messages", + method: "POST", + model: "claude-sonnet-4-20250514", + clientFormat: "claude", + providerName: "anthropic-main", + }), + output: expect.objectContaining({ + statusCode: 200, + durationMs: 500, + costUsd: "0.05", + }), + }); + }); }); describe("isLangfuseEnabled", () => { From 5e932608e5c1e2b673fe7066b407c4fad69960de Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 14:04:39 +0800 Subject: [PATCH 04/10] fix(langfuse): correct observation duration, cost key, and usage type names - Pass actual startTime/endTime to startObservation and end() so root span and generation reflect real request duration instead of ~0ms - Change costDetails key from 'totalUsd' to 'total' per Langfuse SDK convention (values are already in USD) - Change usageDetails keys from camelCase (cacheRead, cacheCreation) to snake_case (cache_read_input_tokens, cache_creation_input_tokens) so Langfuse UI can categorize them as input usage types --- src/lib/langfuse/trace-proxy-request.ts | 53 +++++++++++++--------- tests/unit/langfuse/langfuse-trace.test.ts | 52 +++++++++++++++++++-- 2 files changed, 79 insertions(+), 26 deletions(-) diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 37ff0d140..1db706730 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -93,6 +93,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { const provider = session.provider; const messageContext = session.messageContext; + // Compute actual request timing from session data + const requestStartTime = new Date(session.startTime); + const requestEndTime = new Date(session.startTime + durationMs); + // Build tags - include provider name and model const tags: string[] = []; if (provider?.providerType) tags.push(provider.providerType); @@ -160,10 +164,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { ? { output: ctx.usageMetrics.output_tokens } : {}), ...(ctx.usageMetrics.cache_read_input_tokens != null - ? { cacheRead: ctx.usageMetrics.cache_read_input_tokens } + ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens } : {}), ...(ctx.usageMetrics.cache_creation_input_tokens != null - ? { cacheCreation: ctx.usageMetrics.cache_creation_input_tokens } + ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens } : {}), } : undefined; @@ -171,26 +175,32 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Build cost details const costDetails: Record | undefined = ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0 - ? { totalUsd: Number.parseFloat(ctx.costUsd) } + ? { total: Number.parseFloat(ctx.costUsd) } : undefined; // Create the root trace span - const rootSpan = startObservation("proxy-request", { - input: { - endpoint: session.getEndpoint(), - method: session.method, - model: session.getCurrentModel(), - clientFormat: session.originalFormat, - providerName: provider?.name, - }, - output: { - statusCode, - durationMs, - model: session.getCurrentModel(), - hasUsage: !!ctx.usageMetrics, - costUsd: ctx.costUsd, + const rootSpan = startObservation( + "proxy-request", + { + input: { + endpoint: session.getEndpoint(), + method: session.method, + model: session.getCurrentModel(), + clientFormat: session.originalFormat, + providerName: provider?.name, + }, + output: { + statusCode, + durationMs, + model: session.getCurrentModel(), + hasUsage: !!ctx.usageMetrics, + costUsd: ctx.costUsd, + }, }, - }); + { + startTime: requestStartTime, + } + ); // Propagate trace attributes await propagateAttributes( @@ -223,7 +233,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { ...(costDetails ? { costDetails } : {}), metadata: generationMetadata, }, - { asType: "generation" } + // SDK runtime supports startTime on child observations but types don't expose it + { asType: "generation", startTime: requestStartTime } as { asType: "generation" } ); // Set TTFB as completionStartTime @@ -233,7 +244,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { }); } - generation.end(); + generation.end(requestEndTime); } ); @@ -255,7 +266,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { }, }); - rootSpan.end(); + rootSpan.end(requestEndTime); } catch (error) { logger.warn("[Langfuse] Failed to trace proxy request", { error: error instanceof Error ? error.message : String(error), diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 4810c9f10..1d69ae6e2 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -152,6 +152,9 @@ describe("traceProxyRequest", () => { durationMs: 500, costUsd: undefined, }), + }), + expect.objectContaining({ + startTime: expect.any(Date), }) ); @@ -160,11 +163,14 @@ describe("traceProxyRequest", () => { expect.objectContaining({ model: "claude-sonnet-4-20250514", }), - { asType: "generation" } + expect.objectContaining({ + asType: "generation", + startTime: expect.any(Date), + }) ); - expect(mockSpanEnd).toHaveBeenCalled(); - expect(mockGenerationEnd).toHaveBeenCalled(); + expect(mockSpanEnd).toHaveBeenCalledWith(expect.any(Date)); + expect(mockGenerationEnd).toHaveBeenCalledWith(expect.any(Date)); }); test("should use actual request messages as generation input", async () => { @@ -266,10 +272,10 @@ describe("traceProxyRequest", () => { expect(generationCall[1].usageDetails).toEqual({ input: 100, output: 50, - cacheRead: 20, + cache_read_input_tokens: 20, }); expect(generationCall[1].costDetails).toEqual({ - totalUsd: 0.0015, + total: 0.0015, }); }); @@ -353,6 +359,39 @@ describe("traceProxyRequest", () => { }); }); + test("should pass correct startTime and endTime to observations", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const durationMs = 5000; + + await traceProxyRequest({ + session: createMockSession({ startTime }), + responseHeaders: new Headers(), + durationMs, + statusCode: 200, + isStreaming: false, + }); + + const expectedStart = new Date(startTime); + const expectedEnd = new Date(startTime + durationMs); + + // Root span gets startTime in options (3rd arg) + expect(mockStartObservation).toHaveBeenCalledWith("proxy-request", expect.any(Object), { + startTime: expectedStart, + }); + + // Generation gets startTime in options (3rd arg) + expect(mockRootSpan.startObservation).toHaveBeenCalledWith("llm-call", expect.any(Object), { + asType: "generation", + startTime: expectedStart, + }); + + // Both end() calls receive the computed endTime + expect(mockGenerationEnd).toHaveBeenCalledWith(expectedEnd); + expect(mockSpanEnd).toHaveBeenCalledWith(expectedEnd); + }); + test("should handle errors gracefully without throwing", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); @@ -450,6 +489,9 @@ describe("traceProxyRequest", () => { output: expect.objectContaining({ costUsd: "0.05", }), + }), + expect.objectContaining({ + startTime: expect.any(Date), }) ); }); From debdd692374126bbd16b172ce3f32d2d279452fd Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 14:33:06 +0800 Subject: [PATCH 05/10] fix(langfuse): send raw cost without multiplier and use username as userId - Calculate raw cost (multiplier=1.0) separately for Langfuse while keeping multiplied cost for session billing and rate limiting - Change userId from numeric ID to user.name for readable Langfuse traces - Key name already available in trace metadata as keyName --- src/app/v1/_lib/proxy/response-handler.ts | 38 +++++++++++++++++++--- src/lib/langfuse/trace-proxy-request.ts | 2 +- tests/unit/langfuse/langfuse-trace.test.ts | 2 +- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index d1837029d..60faeb55e 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -740,8 +740,9 @@ export class ProxyResponseHandler { await trackCostToRedis(session, usageMetrics); } - // Calculate cost (for session tracking and Langfuse tracing) + // Calculate cost for session tracking (with multiplier) and Langfuse (raw) let costUsdStr: string | undefined; + let rawCostUsdStr: string | undefined; if (usageMetrics) { try { if (session.request.model) { @@ -756,6 +757,20 @@ export class ProxyResponseHandler { if (cost.gt(0)) { costUsdStr = cost.toString(); } + // Raw cost without multiplier for Langfuse + if (provider.costMultiplier !== 1) { + const rawCost = calculateRequestCost( + usageMetrics, + priceData, + 1.0, + session.getContext1mApplied() + ); + if (rawCost.gt(0)) { + rawCostUsdStr = rawCost.toString(); + } + } else { + rawCostUsdStr = costUsdStr; + } } } } catch (error) { @@ -842,7 +857,7 @@ export class ProxyResponseHandler { responseHeaders: response.headers, responseText, usageMetrics, - costUsd: costUsdStr, + costUsd: rawCostUsdStr, statusCode, durationMs: Date.now() - session.startTime, isStreaming: false, @@ -1665,8 +1680,9 @@ export class ProxyResponseHandler { // 追踪消费到 Redis(用于限流) await trackCostToRedis(session, usageForCost); - // Calculate cost (for session tracking and Langfuse tracing) + // Calculate cost for session tracking (with multiplier) and Langfuse (raw) let costUsdStr: string | undefined; + let rawCostUsdStr: string | undefined; if (usageForCost) { try { if (session.request.model) { @@ -1681,6 +1697,20 @@ export class ProxyResponseHandler { if (cost.gt(0)) { costUsdStr = cost.toString(); } + // Raw cost without multiplier for Langfuse + if (provider.costMultiplier !== 1) { + const rawCost = calculateRequestCost( + usageForCost, + priceData, + 1.0, + session.getContext1mApplied() + ); + if (rawCost.gt(0)) { + rawCostUsdStr = rawCost.toString(); + } + } else { + rawCostUsdStr = costUsdStr; + } } } } catch (error) { @@ -1735,7 +1765,7 @@ export class ProxyResponseHandler { responseHeaders: response.headers, responseText: allContent, usageMetrics: usageForCost, - costUsd: costUsdStr, + costUsd: rawCostUsdStr, statusCode: effectiveStatusCode, durationMs: duration, isStreaming: true, diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 1db706730..c9ff3041e 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -205,7 +205,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Propagate trace attributes await propagateAttributes( { - userId: messageContext?.user?.id ? String(messageContext.user.id) : undefined, + userId: messageContext?.user?.name ?? undefined, sessionId: session.sessionId ?? undefined, tags, metadata: traceMetadata, diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 1d69ae6e2..09663be1a 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -239,7 +239,7 @@ describe("traceProxyRequest", () => { expect(mockPropagateAttributes).toHaveBeenCalledWith( expect.objectContaining({ - userId: "7", + userId: "testuser", sessionId: "sess_abc12345_def67890", tags: expect.arrayContaining([ "claude", From fa180e58f499a4b0cbabaed29616fcc35ca6a684 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 15:55:26 +0800 Subject: [PATCH 06/10] feat(langfuse): add multi-span hierarchy with guard-pipeline, provider-attempt events, and timing breakdown Enrich Langfuse traces from a flat 2-observation structure to a proper span hierarchy that reveals where request time is spent: - Add forwardStartTime to ProxySession to mark when guard pipeline ends - Create guard-pipeline child span (startTime -> forwardStartTime) - Emit provider-attempt events for each failed chain item with WARNING/ERROR level classification - Compute timingBreakdown (guardPipelineMs, upstreamTotalMs, ttfbFromForwardMs, tokenGenerationMs, failedAttempts) with Math.max(0) defensive guards against clock jitter - Set LLM generation startTime to forwardStartTime for accurate upstream call duration - Update test mocks to route by observation name, add 7 new tests --- src/app/v1/_lib/proxy-handler.ts | 1 + src/app/v1/_lib/proxy/session.ts | 13 + src/lib/langfuse/trace-proxy-request.ts | 87 +++++- tests/unit/langfuse/langfuse-trace.test.ts | 342 ++++++++++++++++++--- 4 files changed, 404 insertions(+), 39 deletions(-) diff --git a/src/app/v1/_lib/proxy-handler.ts b/src/app/v1/_lib/proxy-handler.ts index 6c041e390..5f2b90b4e 100644 --- a/src/app/v1/_lib/proxy-handler.ts +++ b/src/app/v1/_lib/proxy-handler.ts @@ -78,6 +78,7 @@ export async function handleProxyRequest(c: Context): Promise { }); } + session.recordForwardStart(); const response = await ProxyForwarder.send(session); const handled = await ProxyResponseHandler.dispatch(session, response); const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled); diff --git a/src/app/v1/_lib/proxy/session.ts b/src/app/v1/_lib/proxy/session.ts index a163c772d..282dd61ef 100644 --- a/src/app/v1/_lib/proxy/session.ts +++ b/src/app/v1/_lib/proxy/session.ts @@ -67,6 +67,9 @@ export class ProxySession { // Time To First Byte (ms). Streaming: first chunk. Non-stream: equals durationMs. ttfbMs: number | null = null; + // Timestamp when guard pipeline finished and forwarding started (epoch ms). + forwardStartTime: number | null = null; + // Session ID(用于会话粘性和并发限流) sessionId: string | null; @@ -313,6 +316,16 @@ export class ProxySession { return value; } + /** + * Record the timestamp when guard pipeline finished and upstream forwarding begins. + * Called once; subsequent calls are no-ops. + */ + recordForwardStart(): void { + if (this.forwardStartTime === null) { + this.forwardStartTime = Date.now(); + } + } + /** * 设置 session ID */ diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index c9ff3041e..10a8047f3 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -43,6 +43,27 @@ function getStatusCategory(statusCode: number): string { const LANGFUSE_MAX_IO_SIZE = Number(process.env.LANGFUSE_MAX_IO_SIZE) || 100_000; +const SUCCESS_REASONS = new Set([ + "request_success", + "retry_success", + "initial_selection", + "session_reuse", +]); + +function isSuccessReason(reason: string | undefined): boolean { + return !!reason && SUCCESS_REASONS.has(reason); +} + +const ERROR_REASONS = new Set([ + "system_error", + "vendor_type_all_timeout", + "endpoint_pool_exhausted", +]); + +function isErrorReason(reason: string | undefined): boolean { + return !!reason && ERROR_REASONS.has(reason); +} + /** * Truncate data for Langfuse to avoid excessive payload sizes. */ @@ -97,6 +118,25 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { const requestStartTime = new Date(session.startTime); const requestEndTime = new Date(session.startTime + durationMs); + // Compute timing breakdown from forwardStartTime + const forwardStartDate = session.forwardStartTime ? new Date(session.forwardStartTime) : null; + const guardPipelineMs = session.forwardStartTime + ? session.forwardStartTime - session.startTime + : null; + + const timingBreakdown = { + guardPipelineMs, + upstreamTotalMs: + guardPipelineMs != null ? Math.max(0, durationMs - guardPipelineMs) : durationMs, + ttfbFromForwardMs: + guardPipelineMs != null && session.ttfbMs != null + ? Math.max(0, session.ttfbMs - guardPipelineMs) + : null, + tokenGenerationMs: session.ttfbMs != null ? Math.max(0, durationMs - session.ttfbMs) : null, + failedAttempts: session.getProviderChain().filter((i) => !isSuccessReason(i.reason)).length, + providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size, + }; + // Build tags - include provider name and model const tags: string[] = []; if (provider?.providerType) tags.push(provider.providerType); @@ -139,6 +179,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Timing durationMs, ttfbMs: session.ttfbMs, + timingBreakdown, // Flags isStreaming, cacheTtlApplied: session.getCacheTtlResolved(), @@ -195,6 +236,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { model: session.getCurrentModel(), hasUsage: !!ctx.usageMetrics, costUsd: ctx.costUsd, + timingBreakdown, }, }, { @@ -212,6 +254,49 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { traceName: `${session.method} ${session.getEndpoint() ?? "/"}`, }, async () => { + // 1. Guard pipeline span (if forwardStartTime was recorded) + if (forwardStartDate) { + const guardSpan = rootSpan.startObservation( + "guard-pipeline", + { + output: { durationMs: guardPipelineMs, passed: true }, + }, + { startTime: requestStartTime } as Record + ); + guardSpan.end(forwardStartDate); + } + + // 2. Provider attempt events (one per failed chain item) + for (const item of session.getProviderChain()) { + if (!isSuccessReason(item.reason)) { + const eventObs = rootSpan.startObservation( + "provider-attempt", + { + level: isErrorReason(item.reason) ? "ERROR" : "WARNING", + input: { + providerId: item.id, + providerName: item.name, + attempt: item.attemptNumber, + }, + output: { + reason: item.reason, + errorMessage: item.errorMessage, + statusCode: item.statusCode, + }, + metadata: { ...item }, + }, + { + asType: "event", + startTime: new Date(item.timestamp ?? session.startTime), + } as { asType: "event" } + ); + eventObs.end(); + } + } + + // 3. LLM generation (startTime = forwardStartTime when available) + const generationStartTime = forwardStartDate ?? requestStartTime; + // Generation input = actual request payload const generationInput = truncateForLangfuse(session.request.message); @@ -234,7 +319,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { metadata: generationMetadata, }, // SDK runtime supports startTime on child observations but types don't expose it - { asType: "generation", startTime: requestStartTime } as { asType: "generation" } + { asType: "generation", startTime: generationStartTime } as { asType: "generation" } ); // Set TTFB as completionStartTime diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 09663be1a..bb72c3f71 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -6,6 +6,8 @@ const mockPropagateAttributes = vi.fn(); const mockSpanEnd = vi.fn(); const mockGenerationEnd = vi.fn(); const mockGenerationUpdate = vi.fn(); +const mockGuardSpanEnd = vi.fn(); +const mockEventEnd = vi.fn(); const mockGeneration: any = { update: (...args: unknown[]) => { @@ -15,14 +17,31 @@ const mockGeneration: any = { end: mockGenerationEnd, }; +const mockGuardSpan: any = { + end: mockGuardSpanEnd, +}; + +const mockEventObs: any = { + end: mockEventEnd, +}; + const mockUpdateTrace = vi.fn(); const mockRootSpan = { - startObservation: vi.fn().mockReturnValue(mockGeneration), + startObservation: vi.fn(), updateTrace: mockUpdateTrace, end: mockSpanEnd, }; +// Default: route by observation name +function setupDefaultStartObservation() { + mockRootSpan.startObservation.mockImplementation((name: string) => { + if (name === "guard-pipeline") return mockGuardSpan; + if (name === "provider-attempt") return mockEventObs; + return mockGeneration; // "llm-call" + }); +} + vi.mock("@langfuse/tracing", () => ({ startObservation: (...args: unknown[]) => { mockStartObservation(...args); @@ -49,8 +68,9 @@ vi.mock("@/lib/langfuse/index", () => ({ })); function createMockSession(overrides: Record = {}) { + const startTime = (overrides.startTime as number) ?? Date.now() - 500; return { - startTime: Date.now() - 500, + startTime, method: "POST", headers: new Headers({ "content-type": "application/json", @@ -81,6 +101,7 @@ function createMockSession(overrides: Record = {}) { key: { name: "default-key" }, }, ttfbMs: 200, + forwardStartTime: startTime + 5, getEndpoint: () => "/v1/messages", getRequestSequence: () => 3, getMessagesLength: () => 1, @@ -93,7 +114,7 @@ function createMockSession(overrides: Record = {}) { name: "anthropic-main", providerType: "claude", reason: "initial_selection", - timestamp: Date.now(), + timestamp: startTime + 2, }, ], getSpecialSettings: () => null, @@ -107,8 +128,7 @@ describe("traceProxyRequest", () => { beforeEach(() => { vi.clearAllMocks(); langfuseEnabled = true; - // Re-setup return values after clearAllMocks - mockRootSpan.startObservation.mockReturnValue(mockGeneration); + setupDefaultStartObservation(); }); test("should not trace when Langfuse is disabled", async () => { @@ -151,6 +171,7 @@ describe("traceProxyRequest", () => { statusCode: 200, durationMs: 500, costUsd: undefined, + timingBreakdown: expect.any(Object), }), }), expect.objectContaining({ @@ -158,16 +179,10 @@ describe("traceProxyRequest", () => { }) ); - expect(mockRootSpan.startObservation).toHaveBeenCalledWith( - "llm-call", - expect.objectContaining({ - model: "claude-sonnet-4-20250514", - }), - expect.objectContaining({ - asType: "generation", - startTime: expect.any(Date), - }) - ); + // Should have 3 child observations: guard-pipeline, llm-call (no failed providers in default mock) + const callNames = mockRootSpan.startObservation.mock.calls.map((c: unknown[]) => c[0]); + expect(callNames).toContain("guard-pipeline"); + expect(callNames).toContain("llm-call"); expect(mockSpanEnd).toHaveBeenCalledWith(expect.any(Date)); expect(mockGenerationEnd).toHaveBeenCalledWith(expect.any(Date)); @@ -186,9 +201,12 @@ describe("traceProxyRequest", () => { responseText: '{"content": "response"}', }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - // Generation input should be the actual request message, not a summary - expect(generationCall[1].input).toEqual(session.request.message); + // Find the llm-call invocation + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall).toBeDefined(); + expect(llmCall[1].input).toEqual(session.request.message); }); test("should use actual response body as generation output", async () => { @@ -204,8 +222,10 @@ describe("traceProxyRequest", () => { responseText: JSON.stringify(responseBody), }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - expect(generationCall[1].output).toEqual(responseBody); + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual(responseBody); }); test("should redact sensitive headers", async () => { @@ -219,8 +239,10 @@ describe("traceProxyRequest", () => { isStreaming: false, }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - const metadata = generationCall[1].metadata; + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const metadata = llmCall[1].metadata; expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]"); expect(metadata.requestHeaders["content-type"]).toBe("application/json"); expect(metadata.responseHeaders["x-api-key"]).toBe("[REDACTED]"); @@ -268,13 +290,15 @@ describe("traceProxyRequest", () => { costUsd: "0.0015", }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - expect(generationCall[1].usageDetails).toEqual({ + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].usageDetails).toEqual({ input: 100, output: 50, cache_read_input_tokens: 20, }); - expect(generationCall[1].costDetails).toEqual({ + expect(llmCall[1].costDetails).toEqual({ total: 0.0015, }); }); @@ -303,8 +327,10 @@ describe("traceProxyRequest", () => { isStreaming: false, }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - const metadata = generationCall[1].metadata; + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const metadata = llmCall[1].metadata; expect(metadata.providerChain).toEqual(providerChain); expect(metadata.specialSettings).toEqual({ maxThinking: 8192 }); expect(metadata.model).toBe("claude-sonnet-4-20250514"); @@ -337,9 +363,11 @@ describe("traceProxyRequest", () => { isStreaming: false, }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - expect(generationCall[1].metadata.modelRedirected).toBe(true); - expect(generationCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514"); + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].metadata.modelRedirected).toBe(true); + expect(llmCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514"); }); test("should set completionStartTime from ttfbMs", async () => { @@ -366,7 +394,7 @@ describe("traceProxyRequest", () => { const durationMs = 5000; await traceProxyRequest({ - session: createMockSession({ startTime }), + session: createMockSession({ startTime, forwardStartTime: startTime + 5 }), responseHeaders: new Headers(), durationMs, statusCode: 200, @@ -375,16 +403,20 @@ describe("traceProxyRequest", () => { const expectedStart = new Date(startTime); const expectedEnd = new Date(startTime + durationMs); + const expectedForwardStart = new Date(startTime + 5); // Root span gets startTime in options (3rd arg) expect(mockStartObservation).toHaveBeenCalledWith("proxy-request", expect.any(Object), { startTime: expectedStart, }); - // Generation gets startTime in options (3rd arg) - expect(mockRootSpan.startObservation).toHaveBeenCalledWith("llm-call", expect.any(Object), { + // Generation gets forwardStartTime in options (3rd arg) + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ asType: "generation", - startTime: expectedStart, + startTime: expectedForwardStart, }); // Both end() calls receive the computed endTime @@ -445,8 +477,10 @@ describe("traceProxyRequest", () => { responseText: largeContent, }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - const output = generationCall[1].output as string; + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + const output = llmCall[1].output as string; // Non-JSON text should be truncated expect(output.length).toBeLessThan(200_000); expect(output).toContain("...[truncated]"); @@ -464,8 +498,10 @@ describe("traceProxyRequest", () => { sseEventCount: 42, }); - const generationCall = mockRootSpan.startObservation.mock.calls[0]; - expect(generationCall[1].output).toEqual({ + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].output).toEqual({ streaming: true, sseEventCount: 42, }); @@ -522,6 +558,236 @@ describe("traceProxyRequest", () => { }), }); }); + + // --- New tests for multi-span hierarchy --- + + test("should create guard-pipeline span with correct timing", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 8; // 8ms guard pipeline + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const guardCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "guard-pipeline" + ); + expect(guardCall).toBeDefined(); + expect(guardCall[1]).toEqual({ + output: { durationMs: 8, passed: true }, + }); + expect(guardCall[2]).toEqual({ startTime: new Date(startTime) }); + + // Guard span should end at forwardStartTime + expect(mockGuardSpanEnd).toHaveBeenCalledWith(new Date(forwardStartTime)); + }); + + test("should skip guard-pipeline span when forwardStartTime is null", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ forwardStartTime: null }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const guardCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "guard-pipeline" + ); + expect(guardCall).toBeUndefined(); + expect(mockGuardSpanEnd).not.toHaveBeenCalled(); + }); + + test("should create provider-attempt events for failed chain items", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const failTimestamp = startTime + 100; + + await traceProxyRequest({ + session: createMockSession({ + startTime, + getProviderChain: () => [ + { + id: 1, + name: "provider-a", + providerType: "claude", + reason: "retry_failed", + errorMessage: "502 Bad Gateway", + statusCode: 502, + attemptNumber: 1, + timestamp: failTimestamp, + }, + { + id: 2, + name: "provider-b", + providerType: "claude", + reason: "system_error", + errorMessage: "ECONNREFUSED", + timestamp: failTimestamp + 50, + }, + { + id: 3, + name: "provider-c", + providerType: "claude", + reason: "request_success", + timestamp: failTimestamp + 200, + }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const eventCalls = mockRootSpan.startObservation.mock.calls.filter( + (c: unknown[]) => c[0] === "provider-attempt" + ); + // 2 failed items (retry_failed + system_error), success is skipped + expect(eventCalls).toHaveLength(2); + + // First event: retry_failed -> WARNING level + expect(eventCalls[0][1]).toEqual( + expect.objectContaining({ + level: "WARNING", + input: expect.objectContaining({ + providerId: 1, + providerName: "provider-a", + attempt: 1, + }), + output: expect.objectContaining({ + reason: "retry_failed", + errorMessage: "502 Bad Gateway", + statusCode: 502, + }), + }) + ); + expect(eventCalls[0][2]).toEqual({ + asType: "event", + startTime: new Date(failTimestamp), + }); + + // Second event: system_error -> ERROR level + expect(eventCalls[1][1].level).toBe("ERROR"); + expect(eventCalls[1][1].output.reason).toBe("system_error"); + }); + + test("should set generation startTime to forwardStartTime", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 10; + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ + asType: "generation", + startTime: new Date(forwardStartTime), + }); + }); + + test("should fall back to requestStartTime when forwardStartTime is null", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + + await traceProxyRequest({ + session: createMockSession({ startTime, forwardStartTime: null }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[2]).toEqual({ + asType: "generation", + startTime: new Date(startTime), + }); + }); + + test("should include timingBreakdown in trace output and generation metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = 1700000000000; + const forwardStartTime = startTime + 5; + + await traceProxyRequest({ + session: createMockSession({ + startTime, + forwardStartTime, + ttfbMs: 105, + getProviderChain: () => [ + { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 }, + { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 100 }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + // Root span output should have timingBreakdown + const rootCall = mockStartObservation.mock.calls[0]; + const rootOutput = rootCall[1].output; + expect(rootOutput.timingBreakdown).toEqual({ + guardPipelineMs: 5, + upstreamTotalMs: 495, + ttfbFromForwardMs: 100, // ttfbMs(105) - guardPipelineMs(5) + tokenGenerationMs: 395, // durationMs(500) - ttfbMs(105) + failedAttempts: 1, // only retry_failed is non-success + providersAttempted: 2, // 2 unique provider ids + }); + + // Generation metadata should also have timingBreakdown + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].metadata.timingBreakdown).toEqual(rootOutput.timingBreakdown); + }); + + test("should not create provider-attempt events when all providers succeeded", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession({ + getProviderChain: () => [ + { id: 1, name: "p1", reason: "initial_selection", timestamp: Date.now() }, + { id: 1, name: "p1", reason: "request_success", timestamp: Date.now() }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const eventCalls = mockRootSpan.startObservation.mock.calls.filter( + (c: unknown[]) => c[0] === "provider-attempt" + ); + expect(eventCalls).toHaveLength(0); + }); }); describe("isLangfuseEnabled", () => { From a910f32cd40fd17e4a00db7e3c9806c4d536c662 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 17:10:19 +0800 Subject: [PATCH 07/10] fix(langfuse): validate LANGFUSE_MAX_IO_SIZE via env schema and fix test mock - Add LANGFUSE_MAX_IO_SIZE to Zod env schema with int/min/max/default validation, replacing raw Number() parse that risked NaN on invalid input - Use lazy getEnvConfig() accessor instead of module-level constant - Add missing recordForwardStart() to proxy-handler test mock --- src/lib/config/env.schema.ts | 1 + src/lib/langfuse/trace-proxy-request.ts | 7 +++++-- tests/unit/proxy/proxy-handler-session-id-error.test.ts | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts index b7dacd738..344efbe7d 100644 --- a/src/lib/config/env.schema.ts +++ b/src/lib/config/env.schema.ts @@ -134,6 +134,7 @@ export const EnvSchema = z.object({ LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"), LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0), LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform), + LANGFUSE_MAX_IO_SIZE: z.coerce.number().int().min(1).max(10_000_000).default(100_000), }); /** diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 10a8047f3..e6236e721 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -1,5 +1,6 @@ import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler"; import type { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { getEnvConfig } from "@/lib/config/env.schema"; import { isLangfuseEnabled } from "@/lib/langfuse/index"; import { logger } from "@/lib/logger"; @@ -41,7 +42,9 @@ function getStatusCategory(statusCode: number): string { return `${Math.floor(statusCode / 100)}xx`; } -const LANGFUSE_MAX_IO_SIZE = Number(process.env.LANGFUSE_MAX_IO_SIZE) || 100_000; +function getLangfuseMaxIoSize(): number { + return getEnvConfig().LANGFUSE_MAX_IO_SIZE; +} const SUCCESS_REASONS = new Set([ "request_success", @@ -67,7 +70,7 @@ function isErrorReason(reason: string | undefined): boolean { /** * Truncate data for Langfuse to avoid excessive payload sizes. */ -function truncateForLangfuse(data: unknown, maxChars: number = LANGFUSE_MAX_IO_SIZE): unknown { +function truncateForLangfuse(data: unknown, maxChars: number = getLangfuseMaxIoSize()): unknown { if (typeof data === "string") { return data.length > maxChars ? `${data.substring(0, maxChars)}...[truncated]` : data; } diff --git a/tests/unit/proxy/proxy-handler-session-id-error.test.ts b/tests/unit/proxy/proxy-handler-session-id-error.test.ts index d336e0dc4..18062cc79 100644 --- a/tests/unit/proxy/proxy-handler-session-id-error.test.ts +++ b/tests/unit/proxy/proxy-handler-session-id-error.test.ts @@ -13,6 +13,7 @@ const h = vi.hoisted(() => ({ }, isCountTokensRequest: () => false, setOriginalFormat: () => {}, + recordForwardStart: () => {}, messageContext: null, provider: null, } as any, From cd156007f7a25663badfe3010084094934726f74 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 21:49:01 +0800 Subject: [PATCH 08/10] feat(langfuse): actual request/response bodies, observation level, and cost breakdown - Root span input/output now shows actual forwarded request body and response body instead of metadata summaries (summaries moved to root span metadata) - Add observation level on root span: DEFAULT (200), WARNING (retried), ERROR (non-200/499) - Add CostBreakdown type with input/output/cache_creation/cache_read/total categories, passed through to Langfuse costDetails - Capture forwardedRequestBody in ProxySession from forwarder (both Gemini and standard paths) - updateTrace uses actual bodies for trace-level input/output --- src/app/v1/_lib/proxy/forwarder.ts | 2 + src/app/v1/_lib/proxy/response-handler.ts | 29 +- src/app/v1/_lib/proxy/session.ts | 3 + src/lib/langfuse/trace-proxy-request.ts | 85 +++--- src/lib/utils/cost-calculation.ts | 208 ++++++++++++++ tests/unit/langfuse/langfuse-trace.test.ts | 259 +++++++++++++++--- .../lib/cost-calculation-breakdown.test.ts | 159 +++++++++++ 7 files changed, 666 insertions(+), 79 deletions(-) create mode 100644 tests/unit/lib/cost-calculation-breakdown.test.ts diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 3759a8570..1abf4019c 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -1648,6 +1648,7 @@ export class ProxyForwarder { const bodyString = JSON.stringify(bodyToSerialize); requestBody = bodyString; + session.forwardedRequestBody = bodyString; } // 检测流式请求:Gemini 支持两种方式 @@ -1974,6 +1975,7 @@ export class ProxyForwarder { const bodyString = JSON.stringify(messageToSend); requestBody = bodyString; + session.forwardedRequestBody = bodyString; try { const parsed = JSON.parse(bodyString); diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 60faeb55e..3dc16dc1b 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -8,7 +8,8 @@ import { RateLimitService } from "@/lib/rate-limit"; import type { LeaseWindowType } from "@/lib/rate-limit/lease"; import { SessionManager } from "@/lib/session-manager"; import { SessionTracker } from "@/lib/session-tracker"; -import { calculateRequestCost } from "@/lib/utils/cost-calculation"; +import type { CostBreakdown } from "@/lib/utils/cost-calculation"; +import { calculateRequestCost, calculateRequestCostBreakdown } from "@/lib/utils/cost-calculation"; import { hasValidPriceData } from "@/lib/utils/price-data"; import { isSSEText, parseSSEData } from "@/lib/utils/sse"; import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection"; @@ -49,6 +50,7 @@ function emitLangfuseTrace( responseText: string; usageMetrics: UsageMetrics | null; costUsd: string | undefined; + costBreakdown?: CostBreakdown; statusCode: number; durationMs: number; isStreaming: boolean; @@ -69,6 +71,7 @@ function emitLangfuseTrace( responseText: data.responseText, usageMetrics: data.usageMetrics, costUsd: data.costUsd, + costBreakdown: data.costBreakdown, sseEventCount: data.sseEventCount, errorMessage: data.errorMessage, }); @@ -743,6 +746,7 @@ export class ProxyResponseHandler { // Calculate cost for session tracking (with multiplier) and Langfuse (raw) let costUsdStr: string | undefined; let rawCostUsdStr: string | undefined; + let costBreakdown: CostBreakdown | undefined; if (usageMetrics) { try { if (session.request.model) { @@ -771,6 +775,16 @@ export class ProxyResponseHandler { } else { rawCostUsdStr = costUsdStr; } + // Cost breakdown for Langfuse (raw, no multiplier) + try { + costBreakdown = calculateRequestCostBreakdown( + usageMetrics, + priceData, + session.getContext1mApplied() + ); + } catch { + /* non-critical */ + } } } } catch (error) { @@ -858,6 +872,7 @@ export class ProxyResponseHandler { responseText, usageMetrics, costUsd: rawCostUsdStr, + costBreakdown, statusCode, durationMs: Date.now() - session.startTime, isStreaming: false, @@ -1683,6 +1698,7 @@ export class ProxyResponseHandler { // Calculate cost for session tracking (with multiplier) and Langfuse (raw) let costUsdStr: string | undefined; let rawCostUsdStr: string | undefined; + let costBreakdown: CostBreakdown | undefined; if (usageForCost) { try { if (session.request.model) { @@ -1711,6 +1727,16 @@ export class ProxyResponseHandler { } else { rawCostUsdStr = costUsdStr; } + // Cost breakdown for Langfuse (raw, no multiplier) + try { + costBreakdown = calculateRequestCostBreakdown( + usageForCost, + priceData, + session.getContext1mApplied() + ); + } catch { + /* non-critical */ + } } } } catch (error) { @@ -1766,6 +1792,7 @@ export class ProxyResponseHandler { responseText: allContent, usageMetrics: usageForCost, costUsd: rawCostUsdStr, + costBreakdown, statusCode: effectiveStatusCode, durationMs: duration, isStreaming: true, diff --git a/src/app/v1/_lib/proxy/session.ts b/src/app/v1/_lib/proxy/session.ts index 282dd61ef..abae33872 100644 --- a/src/app/v1/_lib/proxy/session.ts +++ b/src/app/v1/_lib/proxy/session.ts @@ -70,6 +70,9 @@ export class ProxySession { // Timestamp when guard pipeline finished and forwarding started (epoch ms). forwardStartTime: number | null = null; + // Actual serialized request body sent to upstream (after all preprocessing). + forwardedRequestBody: string | null = null; + // Session ID(用于会话粘性和并发限流) sessionId: string | null; diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index e6236e721..b805f5b1b 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -3,6 +3,7 @@ import type { ProxySession } from "@/app/v1/_lib/proxy/session"; import { getEnvConfig } from "@/lib/config/env.schema"; import { isLangfuseEnabled } from "@/lib/langfuse/index"; import { logger } from "@/lib/logger"; +import type { CostBreakdown } from "@/lib/utils/cost-calculation"; // Auth-sensitive header names to redact const REDACTED_HEADERS = new Set([ @@ -88,6 +89,8 @@ function truncateForLangfuse(data: unknown, maxChars: number = getLangfuseMaxIoS return data; } +type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"; + export interface TraceContext { session: ProxySession; responseHeaders: Headers; @@ -99,6 +102,7 @@ export interface TraceContext { errorMessage?: string; usageMetrics?: UsageMetrics | null; costUsd?: string; + costBreakdown?: CostBreakdown; } /** @@ -140,6 +144,43 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size, }; + // Compute observation level for root span + let rootSpanLevel: ObservationLevel = "DEFAULT"; + if (statusCode < 200 || statusCode >= 300) { + rootSpanLevel = "ERROR"; + } else { + const failedAttempts = session + .getProviderChain() + .filter((i) => !isSuccessReason(i.reason)).length; + if (failedAttempts >= 1) rootSpanLevel = "WARNING"; + } + + // Actual request body (forwarded to upstream after all preprocessing) + const actualRequestBody = session.forwardedRequestBody + ? truncateForLangfuse(tryParseJsonSafe(session.forwardedRequestBody)) + : truncateForLangfuse(session.request.message); + + // Actual response body + const actualResponseBody = ctx.responseText + ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText)) + : isStreaming + ? { streaming: true, sseEventCount: ctx.sseEventCount } + : { statusCode }; + + // Root span metadata (former input/output summaries moved here) + const rootSpanMetadata: Record = { + endpoint: session.getEndpoint(), + method: session.method, + model: session.getCurrentModel(), + clientFormat: session.originalFormat, + providerName: provider?.name, + statusCode, + durationMs, + hasUsage: !!ctx.usageMetrics, + costUsd: ctx.costUsd, + timingBreakdown, + }; + // Build tags - include provider name and model const tags: string[] = []; if (provider?.providerType) tags.push(provider.providerType); @@ -216,31 +257,21 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { } : undefined; - // Build cost details - const costDetails: Record | undefined = - ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0 + // Build cost details (prefer breakdown, fallback to total-only) + const costDetails: Record | undefined = ctx.costBreakdown + ? { ...ctx.costBreakdown } + : ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0 ? { total: Number.parseFloat(ctx.costUsd) } : undefined; - // Create the root trace span + // Create the root trace span with actual bodies, level, and metadata const rootSpan = startObservation( "proxy-request", { - input: { - endpoint: session.getEndpoint(), - method: session.method, - model: session.getCurrentModel(), - clientFormat: session.originalFormat, - providerName: provider?.name, - }, - output: { - statusCode, - durationMs, - model: session.getCurrentModel(), - hasUsage: !!ctx.usageMetrics, - costUsd: ctx.costUsd, - timingBreakdown, - }, + input: actualRequestBody, + output: actualResponseBody, + level: rootSpanLevel, + metadata: rootSpanMetadata, }, { startTime: requestStartTime, @@ -338,20 +369,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // Explicitly set trace-level input/output (propagateAttributes does not support these) rootSpan.updateTrace({ - input: { - endpoint: session.getEndpoint(), - method: session.method, - model: session.getCurrentModel(), - clientFormat: session.originalFormat, - providerName: provider?.name, - }, - output: { - statusCode, - durationMs, - model: session.getCurrentModel(), - hasUsage: !!ctx.usageMetrics, - costUsd: ctx.costUsd, - }, + input: actualRequestBody, + output: actualResponseBody, }); rootSpan.end(requestEndTime); diff --git a/src/lib/utils/cost-calculation.ts b/src/lib/utils/cost-calculation.ts index 1212a1f99..0be83453d 100644 --- a/src/lib/utils/cost-calculation.ts +++ b/src/lib/utils/cost-calculation.ts @@ -98,6 +98,214 @@ function calculateTieredCostWithSeparatePrices( return baseCost.add(premiumCost); } +export interface CostBreakdown { + input: number; + output: number; + cache_creation: number; + cache_read: number; + total: number; +} + +/** + * Calculate cost breakdown by category (always raw cost, multiplier=1.0). + * Returns per-category costs as plain numbers. + */ +export function calculateRequestCostBreakdown( + usage: UsageMetrics, + priceData: ModelPriceData, + context1mApplied: boolean = false +): CostBreakdown { + let inputBucket = new Decimal(0); + let outputBucket = new Decimal(0); + let cacheCreationBucket = new Decimal(0); + let cacheReadBucket = new Decimal(0); + + const inputCostPerToken = priceData.input_cost_per_token; + const outputCostPerToken = priceData.output_cost_per_token; + const inputCostPerRequest = priceData.input_cost_per_request; + + // Per-request cost -> input bucket + if ( + typeof inputCostPerRequest === "number" && + Number.isFinite(inputCostPerRequest) && + inputCostPerRequest >= 0 + ) { + const requestCost = toDecimal(inputCostPerRequest); + if (requestCost) { + inputBucket = inputBucket.add(requestCost); + } + } + + const cacheCreation5mCost = + priceData.cache_creation_input_token_cost ?? + (inputCostPerToken != null ? inputCostPerToken * 1.25 : undefined); + + const cacheCreation1hCost = + priceData.cache_creation_input_token_cost_above_1hr ?? + (inputCostPerToken != null ? inputCostPerToken * 2 : undefined) ?? + cacheCreation5mCost; + + const cacheReadCost = + priceData.cache_read_input_token_cost ?? + (inputCostPerToken != null + ? inputCostPerToken * 0.1 + : outputCostPerToken != null + ? outputCostPerToken * 0.1 + : undefined); + + // Derive cache creation tokens by TTL + let cache5mTokens = usage.cache_creation_5m_input_tokens; + let cache1hTokens = usage.cache_creation_1h_input_tokens; + + if (typeof usage.cache_creation_input_tokens === "number") { + const remaining = + usage.cache_creation_input_tokens - (cache5mTokens ?? 0) - (cache1hTokens ?? 0); + + if (remaining > 0) { + const target = usage.cache_ttl === "1h" ? "1h" : "5m"; + if (target === "1h") { + cache1hTokens = (cache1hTokens ?? 0) + remaining; + } else { + cache5mTokens = (cache5mTokens ?? 0) + remaining; + } + } + } + + const inputAbove200k = priceData.input_cost_per_token_above_200k_tokens; + const outputAbove200k = priceData.output_cost_per_token_above_200k_tokens; + + // Input tokens -> input bucket + if (context1mApplied && inputCostPerToken != null && usage.input_tokens != null) { + inputBucket = inputBucket.add( + calculateTieredCost( + usage.input_tokens, + inputCostPerToken, + CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER + ) + ); + } else if (inputAbove200k != null && inputCostPerToken != null && usage.input_tokens != null) { + inputBucket = inputBucket.add( + calculateTieredCostWithSeparatePrices(usage.input_tokens, inputCostPerToken, inputAbove200k) + ); + } else { + inputBucket = inputBucket.add(multiplyCost(usage.input_tokens, inputCostPerToken)); + } + + // Output tokens -> output bucket + if (context1mApplied && outputCostPerToken != null && usage.output_tokens != null) { + outputBucket = outputBucket.add( + calculateTieredCost( + usage.output_tokens, + outputCostPerToken, + CONTEXT_1M_OUTPUT_PREMIUM_MULTIPLIER + ) + ); + } else if (outputAbove200k != null && outputCostPerToken != null && usage.output_tokens != null) { + outputBucket = outputBucket.add( + calculateTieredCostWithSeparatePrices( + usage.output_tokens, + outputCostPerToken, + outputAbove200k + ) + ); + } else { + outputBucket = outputBucket.add(multiplyCost(usage.output_tokens, outputCostPerToken)); + } + + // Cache costs + const cacheCreationAbove200k = priceData.cache_creation_input_token_cost_above_200k_tokens; + const cacheReadAbove200k = priceData.cache_read_input_token_cost_above_200k_tokens; + const hasRealCacheCreationBase = priceData.cache_creation_input_token_cost != null; + const hasRealCacheReadBase = priceData.cache_read_input_token_cost != null; + + // Cache creation 5m -> cache_creation bucket + if (context1mApplied && cacheCreation5mCost != null && cache5mTokens != null) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCost(cache5mTokens, cacheCreation5mCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER) + ); + } else if ( + hasRealCacheCreationBase && + cacheCreationAbove200k != null && + cacheCreation5mCost != null && + cache5mTokens != null + ) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCostWithSeparatePrices( + cache5mTokens, + cacheCreation5mCost, + cacheCreationAbove200k + ) + ); + } else { + cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache5mTokens, cacheCreation5mCost)); + } + + // Cache creation 1h -> cache_creation bucket + if (context1mApplied && cacheCreation1hCost != null && cache1hTokens != null) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCost(cache1hTokens, cacheCreation1hCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER) + ); + } else if ( + hasRealCacheCreationBase && + cacheCreationAbove200k != null && + cacheCreation1hCost != null && + cache1hTokens != null + ) { + cacheCreationBucket = cacheCreationBucket.add( + calculateTieredCostWithSeparatePrices( + cache1hTokens, + cacheCreation1hCost, + cacheCreationAbove200k + ) + ); + } else { + cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache1hTokens, cacheCreation1hCost)); + } + + // Cache read -> cache_read bucket + if ( + hasRealCacheReadBase && + cacheReadAbove200k != null && + cacheReadCost != null && + usage.cache_read_input_tokens != null + ) { + cacheReadBucket = cacheReadBucket.add( + calculateTieredCostWithSeparatePrices( + usage.cache_read_input_tokens, + cacheReadCost, + cacheReadAbove200k + ) + ); + } else { + cacheReadBucket = cacheReadBucket.add( + multiplyCost(usage.cache_read_input_tokens, cacheReadCost) + ); + } + + // Image tokens -> respective buckets + if (usage.output_image_tokens != null && usage.output_image_tokens > 0) { + const imageCostPerToken = + priceData.output_cost_per_image_token ?? priceData.output_cost_per_token; + outputBucket = outputBucket.add(multiplyCost(usage.output_image_tokens, imageCostPerToken)); + } + + if (usage.input_image_tokens != null && usage.input_image_tokens > 0) { + const imageCostPerToken = + priceData.input_cost_per_image_token ?? priceData.input_cost_per_token; + inputBucket = inputBucket.add(multiplyCost(usage.input_image_tokens, imageCostPerToken)); + } + + const total = inputBucket.add(outputBucket).add(cacheCreationBucket).add(cacheReadBucket); + + return { + input: inputBucket.toDecimalPlaces(COST_SCALE).toNumber(), + output: outputBucket.toDecimalPlaces(COST_SCALE).toNumber(), + cache_creation: cacheCreationBucket.toDecimalPlaces(COST_SCALE).toNumber(), + cache_read: cacheReadBucket.toDecimalPlaces(COST_SCALE).toNumber(), + total: total.toDecimalPlaces(COST_SCALE).toNumber(), + }; +} + /** * 计算单次请求的费用 * @param usage - token使用量 diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index bb72c3f71..8c4704a8e 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -102,6 +102,7 @@ function createMockSession(overrides: Record = {}) { }, ttfbMs: 200, forwardStartTime: startTime + 5, + forwardedRequestBody: null, getEndpoint: () => "/v1/messages", getRequestSequence: () => 3, getMessagesLength: () => 1, @@ -146,8 +147,9 @@ describe("traceProxyRequest", () => { expect(mockStartObservation).not.toHaveBeenCalled(); }); - test("should trace when Langfuse is enabled", async () => { + test("should trace when Langfuse is enabled with actual bodies", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { content: "Hi there" }; await traceProxyRequest({ session: createMockSession(), @@ -155,31 +157,34 @@ describe("traceProxyRequest", () => { durationMs: 500, statusCode: 200, isStreaming: false, - responseText: '{"content": "Hi there"}', + responseText: JSON.stringify(responseBody), }); - expect(mockStartObservation).toHaveBeenCalledWith( - "proxy-request", + // Root span should have actual request body as input (not summary) + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[0]).toBe("proxy-request"); + // Input should be the actual request message (since forwardedRequestBody is null) + expect(rootCall[1].input).toEqual( expect.objectContaining({ - input: expect.objectContaining({ - endpoint: "/v1/messages", - method: "POST", - clientFormat: "claude", - providerName: "anthropic-main", - }), - output: expect.objectContaining({ - statusCode: 200, - durationMs: 500, - costUsd: undefined, - timingBreakdown: expect.any(Object), - }), - }), + model: "claude-sonnet-4-20250514", + messages: expect.any(Array), + }) + ); + // Output should be actual response body + expect(rootCall[1].output).toEqual(responseBody); + // Should have level + expect(rootCall[1].level).toBe("DEFAULT"); + // Should have metadata with former summaries + expect(rootCall[1].metadata).toEqual( expect.objectContaining({ - startTime: expect.any(Date), + endpoint: "/v1/messages", + method: "POST", + statusCode: 200, + durationMs: 500, }) ); - // Should have 3 child observations: guard-pipeline, llm-call (no failed providers in default mock) + // Should have child observations const callNames = mockRootSpan.startObservation.mock.calls.map((c: unknown[]) => c[0]); expect(callNames).toContain("guard-pipeline"); expect(callNames).toContain("llm-call"); @@ -507,7 +512,7 @@ describe("traceProxyRequest", () => { }); }); - test("should include costUsd in root span output", async () => { + test("should include costUsd in root span metadata", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); await traceProxyRequest({ @@ -519,20 +524,17 @@ describe("traceProxyRequest", () => { costUsd: "0.05", }); - expect(mockStartObservation).toHaveBeenCalledWith( - "proxy-request", - expect.objectContaining({ - output: expect.objectContaining({ - costUsd: "0.05", - }), - }), + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].metadata).toEqual( expect.objectContaining({ - startTime: expect.any(Date), + costUsd: "0.05", }) ); }); - test("should set trace-level input/output via updateTrace", async () => { + + test("should set trace-level input/output via updateTrace with actual bodies", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + const responseBody = { result: "ok" }; await traceProxyRequest({ session: createMockSession(), @@ -540,22 +542,16 @@ describe("traceProxyRequest", () => { durationMs: 500, statusCode: 200, isStreaming: false, + responseText: JSON.stringify(responseBody), costUsd: "0.05", }); expect(mockUpdateTrace).toHaveBeenCalledWith({ input: expect.objectContaining({ - endpoint: "/v1/messages", - method: "POST", model: "claude-sonnet-4-20250514", - clientFormat: "claude", - providerName: "anthropic-main", - }), - output: expect.objectContaining({ - statusCode: 200, - durationMs: 500, - costUsd: "0.05", + messages: expect.any(Array), }), + output: responseBody, }); }); @@ -726,7 +722,7 @@ describe("traceProxyRequest", () => { }); }); - test("should include timingBreakdown in trace output and generation metadata", async () => { + test("should include timingBreakdown in root span metadata and generation metadata", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); const startTime = 1700000000000; @@ -748,23 +744,24 @@ describe("traceProxyRequest", () => { isStreaming: false, }); - // Root span output should have timingBreakdown - const rootCall = mockStartObservation.mock.calls[0]; - const rootOutput = rootCall[1].output; - expect(rootOutput.timingBreakdown).toEqual({ + const expectedTimingBreakdown = { guardPipelineMs: 5, upstreamTotalMs: 495, ttfbFromForwardMs: 100, // ttfbMs(105) - guardPipelineMs(5) tokenGenerationMs: 395, // durationMs(500) - ttfbMs(105) failedAttempts: 1, // only retry_failed is non-success providersAttempted: 2, // 2 unique provider ids - }); + }; + + // Root span metadata should have timingBreakdown + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown); // Generation metadata should also have timingBreakdown const llmCall = mockRootSpan.startObservation.mock.calls.find( (c: unknown[]) => c[0] === "llm-call" ); - expect(llmCall[1].metadata.timingBreakdown).toEqual(rootOutput.timingBreakdown); + expect(llmCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown); }); test("should not create provider-attempt events when all providers succeeded", async () => { @@ -788,6 +785,178 @@ describe("traceProxyRequest", () => { ); expect(eventCalls).toHaveLength(0); }); + + // --- New tests for input/output, level, and cost breakdown --- + + test("should use forwardedRequestBody as trace input when available", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const forwardedBody = JSON.stringify({ + model: "claude-sonnet-4-20250514", + messages: [{ role: "user", content: "Preprocessed Hello" }], + stream: true, + }); + + await traceProxyRequest({ + session: createMockSession({ + forwardedRequestBody: forwardedBody, + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + responseText: '{"ok": true}', + }); + + // Root span input should be the forwarded body (parsed JSON) + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].input).toEqual(JSON.parse(forwardedBody)); + + // updateTrace should also use forwarded body + expect(mockUpdateTrace).toHaveBeenCalledWith({ + input: JSON.parse(forwardedBody), + output: { ok: true }, + }); + }); + + test("should set root span level to DEFAULT for successful request", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("DEFAULT"); + }); + + test("should set root span level to WARNING when retries occurred", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const startTime = Date.now() - 500; + await traceProxyRequest({ + session: createMockSession({ + startTime, + getProviderChain: () => [ + { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 }, + { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 200 }, + ], + }), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("WARNING"); + }); + + test("should set root span level to ERROR for non-200 status", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 502, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("ERROR"); + }); + + test("should set root span level to ERROR for 499 client abort", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 499, + isStreaming: false, + }); + + const rootCall = mockStartObservation.mock.calls[0]; + expect(rootCall[1].level).toBe("ERROR"); + }); + + test("should include cost breakdown in costDetails when provided", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + const costBreakdown = { + input: 0.001, + output: 0.002, + cache_creation: 0.0005, + cache_read: 0.0001, + total: 0.0036, + }; + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.0036", + costBreakdown, + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].costDetails).toEqual(costBreakdown); + }); + + test("should fallback to total-only costDetails when no breakdown", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + const llmCall = mockRootSpan.startObservation.mock.calls.find( + (c: unknown[]) => c[0] === "llm-call" + ); + expect(llmCall[1].costDetails).toEqual({ total: 0.05 }); + }); + + test("should include former summaries in root span metadata", async () => { + const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); + + await traceProxyRequest({ + session: createMockSession(), + responseHeaders: new Headers(), + durationMs: 500, + statusCode: 200, + isStreaming: false, + costUsd: "0.05", + }); + + const rootCall = mockStartObservation.mock.calls[0]; + const metadata = rootCall[1].metadata; + // Former input summary fields + expect(metadata.endpoint).toBe("/v1/messages"); + expect(metadata.method).toBe("POST"); + expect(metadata.model).toBe("claude-sonnet-4-20250514"); + expect(metadata.clientFormat).toBe("claude"); + expect(metadata.providerName).toBe("anthropic-main"); + // Former output summary fields + expect(metadata.statusCode).toBe(200); + expect(metadata.durationMs).toBe(500); + expect(metadata.costUsd).toBe("0.05"); + expect(metadata.timingBreakdown).toBeDefined(); + }); }); describe("isLangfuseEnabled", () => { diff --git a/tests/unit/lib/cost-calculation-breakdown.test.ts b/tests/unit/lib/cost-calculation-breakdown.test.ts new file mode 100644 index 000000000..b8589ffb9 --- /dev/null +++ b/tests/unit/lib/cost-calculation-breakdown.test.ts @@ -0,0 +1,159 @@ +import { describe, expect, test } from "vitest"; +import { calculateRequestCostBreakdown, type CostBreakdown } from "@/lib/utils/cost-calculation"; +import type { ModelPriceData } from "@/types/model-price"; + +function makePriceData(overrides: Partial = {}): ModelPriceData { + return { + input_cost_per_token: 0.000003, // $3/MTok + output_cost_per_token: 0.000015, // $15/MTok + cache_creation_input_token_cost: 0.00000375, // 1.25x input + cache_read_input_token_cost: 0.0000003, // 0.1x input + ...overrides, + }; +} + +describe("calculateRequestCostBreakdown", () => { + test("basic input + output tokens", () => { + const result = calculateRequestCostBreakdown( + { input_tokens: 1000, output_tokens: 500 }, + makePriceData() + ); + + expect(result.input).toBeCloseTo(0.003, 6); // 1000 * 0.000003 + expect(result.output).toBeCloseTo(0.0075, 6); // 500 * 0.000015 + expect(result.cache_creation).toBe(0); + expect(result.cache_read).toBe(0); + expect(result.total).toBeCloseTo(0.0105, 6); + }); + + test("cache creation (5m + 1h) + cache read", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 100, + output_tokens: 50, + cache_creation_5m_input_tokens: 200, + cache_creation_1h_input_tokens: 300, + cache_read_input_tokens: 1000, + }, + makePriceData({ + cache_creation_input_token_cost_above_1hr: 0.000006, // 2x input + }) + ); + + // cache_creation = 200 * 0.00000375 + 300 * 0.000006 + expect(result.cache_creation).toBeCloseTo(0.00255, 6); + // cache_read = 1000 * 0.0000003 + expect(result.cache_read).toBeCloseTo(0.0003, 6); + expect(result.total).toBeCloseTo( + result.input + result.output + result.cache_creation + result.cache_read, + 10 + ); + }); + + test("image tokens go to input/output buckets", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 100, + output_tokens: 50, + input_image_tokens: 500, + output_image_tokens: 200, + }, + makePriceData({ + input_cost_per_image_token: 0.00001, + output_cost_per_image_token: 0.00005, + }) + ); + + // input = 100 * 0.000003 + 500 * 0.00001 + expect(result.input).toBeCloseTo(0.0053, 6); + // output = 50 * 0.000015 + 200 * 0.00005 + expect(result.output).toBeCloseTo(0.01075, 6); + }); + + test("tiered pricing with context1mApplied", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 300000, // crosses 200k threshold + output_tokens: 100, + }, + makePriceData(), + true // context1mApplied + ); + + // input: 200000 * 0.000003 + 100000 * 0.000003 * 2.0 = 0.6 + 0.6 = 1.2 + expect(result.input).toBeCloseTo(1.2, 4); + // output: 100 tokens, below 200k threshold + expect(result.output).toBeCloseTo(0.0015, 6); + }); + + test("200k tier pricing (Gemini style)", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 300000, // crosses 200k threshold + output_tokens: 100, + }, + makePriceData({ + input_cost_per_token_above_200k_tokens: 0.000006, // 2x base for >200k + }) + ); + + // input: 200000 * 0.000003 + 100000 * 0.000006 = 0.6 + 0.6 = 1.2 + expect(result.input).toBeCloseTo(1.2, 4); + }); + + test("categories sum to total", () => { + const result = calculateRequestCostBreakdown( + { + input_tokens: 5000, + output_tokens: 2000, + cache_creation_input_tokens: 1000, + cache_read_input_tokens: 3000, + }, + makePriceData() + ); + + const sum = result.input + result.output + result.cache_creation + result.cache_read; + expect(result.total).toBeCloseTo(sum, 10); + }); + + test("zero usage returns all zeros", () => { + const result = calculateRequestCostBreakdown({}, makePriceData()); + + expect(result).toEqual({ + input: 0, + output: 0, + cache_creation: 0, + cache_read: 0, + total: 0, + }); + }); + + test("per-request cost goes to input bucket", () => { + const result = calculateRequestCostBreakdown( + { input_tokens: 0 }, + makePriceData({ input_cost_per_request: 0.01 }) + ); + + expect(result.input).toBeCloseTo(0.01, 6); + expect(result.total).toBeCloseTo(0.01, 6); + }); + + test("cache_creation_input_tokens distributed by cache_ttl", () => { + // When only cache_creation_input_tokens is set (no 5m/1h split), + // it should be assigned based on cache_ttl + const result = calculateRequestCostBreakdown( + { + input_tokens: 0, + output_tokens: 0, + cache_creation_input_tokens: 1000, + cache_ttl: "1h", + }, + makePriceData({ + cache_creation_input_token_cost_above_1hr: 0.000006, + }) + ); + + // 1000 tokens should go to 1h tier at 0.000006 + expect(result.cache_creation).toBeCloseTo(0.006, 6); + }); +}); From a04356ec33fc666378795ef6b18a03e78124dd38 Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 22:24:46 +0800 Subject: [PATCH 09/10] refactor(langfuse): remove truncation and header redaction for raw observability - Remove truncateForLangfuse, sanitizeHeaders, REDACTED_HEADERS, and LANGFUSE_MAX_IO_SIZE env config -- observability platforms should store raw data without lossy transformations - Replace sanitizeHeaders with headersToRecord (pass-through) - Unify generation input to use actualRequestBody (forwardedRequestBody when available) instead of session.request.message, ensuring the complete forwarded request body is recorded everywhere --- src/lib/config/env.schema.ts | 1 - src/lib/langfuse/trace-proxy-request.ts | 74 ++++++---------------- tests/unit/langfuse/langfuse-trace.test.ts | 16 ++--- 3 files changed, 26 insertions(+), 65 deletions(-) diff --git a/src/lib/config/env.schema.ts b/src/lib/config/env.schema.ts index 344efbe7d..b7dacd738 100644 --- a/src/lib/config/env.schema.ts +++ b/src/lib/config/env.schema.ts @@ -134,7 +134,6 @@ export const EnvSchema = z.object({ LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"), LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0), LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform), - LANGFUSE_MAX_IO_SIZE: z.coerce.number().int().min(1).max(10_000_000).default(100_000), }); /** diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index b805f5b1b..3ccd1e748 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -1,28 +1,9 @@ import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler"; import type { ProxySession } from "@/app/v1/_lib/proxy/session"; -import { getEnvConfig } from "@/lib/config/env.schema"; import { isLangfuseEnabled } from "@/lib/langfuse/index"; import { logger } from "@/lib/logger"; import type { CostBreakdown } from "@/lib/utils/cost-calculation"; -// Auth-sensitive header names to redact -const REDACTED_HEADERS = new Set([ - "x-api-key", - "authorization", - "x-goog-api-key", - "anthropic-api-key", - "cookie", - "set-cookie", -]); - -function sanitizeHeaders(headers: Headers): Record { - const result: Record = {}; - headers.forEach((value, key) => { - result[key] = REDACTED_HEADERS.has(key.toLowerCase()) ? "[REDACTED]" : value; - }); - return result; -} - function buildRequestBodySummary(session: ProxySession): Record { const msg = session.request.message as Record; return { @@ -43,8 +24,12 @@ function getStatusCategory(statusCode: number): string { return `${Math.floor(statusCode / 100)}xx`; } -function getLangfuseMaxIoSize(): number { - return getEnvConfig().LANGFUSE_MAX_IO_SIZE; +function headersToRecord(headers: Headers): Record { + const result: Record = {}; + headers.forEach((value, key) => { + result[key] = value; + }); + return result; } const SUCCESS_REASONS = new Set([ @@ -68,27 +53,6 @@ function isErrorReason(reason: string | undefined): boolean { return !!reason && ERROR_REASONS.has(reason); } -/** - * Truncate data for Langfuse to avoid excessive payload sizes. - */ -function truncateForLangfuse(data: unknown, maxChars: number = getLangfuseMaxIoSize()): unknown { - if (typeof data === "string") { - return data.length > maxChars ? `${data.substring(0, maxChars)}...[truncated]` : data; - } - if (data != null && typeof data === "object") { - const str = JSON.stringify(data); - if (str.length > maxChars) { - return { - _truncated: true, - _length: str.length, - _preview: str.substring(0, Math.min(maxChars, 2000)), - }; - } - return data; - } - return data; -} - type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR"; export interface TraceContext { @@ -155,14 +119,14 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { if (failedAttempts >= 1) rootSpanLevel = "WARNING"; } - // Actual request body (forwarded to upstream after all preprocessing) + // Actual request body (forwarded to upstream after all preprocessing) - no truncation const actualRequestBody = session.forwardedRequestBody - ? truncateForLangfuse(tryParseJsonSafe(session.forwardedRequestBody)) - : truncateForLangfuse(session.request.message); + ? tryParseJsonSafe(session.forwardedRequestBody) + : session.request.message; - // Actual response body + // Actual response body - no truncation const actualResponseBody = ctx.responseText - ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText)) + ? tryParseJsonSafe(ctx.responseText) : isStreaming ? { streaming: true, sseEventCount: ctx.sseEventCount } : { statusCode }; @@ -199,7 +163,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { requestSequence: String(session.getRequestSequence()), }; - // Build generation metadata - all request detail fields + // Build generation metadata - all request detail fields, raw headers (no redaction) const generationMetadata: Record = { // Provider providerId: provider?.id, @@ -234,9 +198,9 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { requestSummary: buildRequestBodySummary(session), // SSE sseEventCount: ctx.sseEventCount, - // Headers (sanitized) - requestHeaders: sanitizeHeaders(session.headers), - responseHeaders: sanitizeHeaders(ctx.responseHeaders), + // Headers (raw, no redaction) + requestHeaders: headersToRecord(session.headers), + responseHeaders: headersToRecord(ctx.responseHeaders), }; // Build usage details for Langfuse generation @@ -331,12 +295,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise { // 3. LLM generation (startTime = forwardStartTime when available) const generationStartTime = forwardStartDate ?? requestStartTime; - // Generation input = actual request payload - const generationInput = truncateForLangfuse(session.request.message); - - // Generation output = actual response body + // Generation input/output = raw payload, no truncation + const generationInput = actualRequestBody; const generationOutput = ctx.responseText - ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText)) + ? tryParseJsonSafe(ctx.responseText) : isStreaming ? { streaming: true, sseEventCount: ctx.sseEventCount } : { statusCode }; diff --git a/tests/unit/langfuse/langfuse-trace.test.ts b/tests/unit/langfuse/langfuse-trace.test.ts index 8c4704a8e..c1760bb7d 100644 --- a/tests/unit/langfuse/langfuse-trace.test.ts +++ b/tests/unit/langfuse/langfuse-trace.test.ts @@ -233,7 +233,7 @@ describe("traceProxyRequest", () => { expect(llmCall[1].output).toEqual(responseBody); }); - test("should redact sensitive headers", async () => { + test("should pass raw headers without redaction", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); await traceProxyRequest({ @@ -248,9 +248,9 @@ describe("traceProxyRequest", () => { (c: unknown[]) => c[0] === "llm-call" ); const metadata = llmCall[1].metadata; - expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]"); + expect(metadata.requestHeaders["x-api-key"]).toBe("test-mock-key-not-real"); expect(metadata.requestHeaders["content-type"]).toBe("application/json"); - expect(metadata.responseHeaders["x-api-key"]).toBe("[REDACTED]"); + expect(metadata.responseHeaders["x-api-key"]).toBe("secret-mock"); }); test("should include provider name and model in tags", async () => { @@ -467,10 +467,10 @@ describe("traceProxyRequest", () => { ); }); - test("should truncate large input/output for Langfuse", async () => { + test("should pass large input/output without truncation", async () => { const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request"); - // Generate a large response text (> default 100KB limit) + // Generate a large response text const largeContent = "x".repeat(200_000); await traceProxyRequest({ @@ -486,9 +486,9 @@ describe("traceProxyRequest", () => { (c: unknown[]) => c[0] === "llm-call" ); const output = llmCall[1].output as string; - // Non-JSON text should be truncated - expect(output.length).toBeLessThan(200_000); - expect(output).toContain("...[truncated]"); + // Should be the full content, no truncation + expect(output).toBe(largeContent); + expect(output).not.toContain("...[truncated]"); }); test("should show streaming output with sseEventCount when no responseText", async () => { From db043d8da587475eb518361485b760bf36e3559f Mon Sep 17 00:00:00 2001 From: ding113 Date: Sun, 15 Feb 2026 22:46:18 +0800 Subject: [PATCH 10/10] docs(langfuse): clarify header security model in headersToRecord session.headers = client -> CCH (user's own key, safe to log). Upstream provider API key (outboundKey) is injected by ProxyForwarder into a separate Headers object, never present in traced headers. --- src/lib/langfuse/trace-proxy-request.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/lib/langfuse/trace-proxy-request.ts b/src/lib/langfuse/trace-proxy-request.ts index 3ccd1e748..cc940b394 100644 --- a/src/lib/langfuse/trace-proxy-request.ts +++ b/src/lib/langfuse/trace-proxy-request.ts @@ -24,6 +24,17 @@ function getStatusCategory(statusCode: number): string { return `${Math.floor(statusCode / 100)}xx`; } +/** + * Convert Headers to a plain record. + * + * Security note: session.headers are the CLIENT's original request headers + * (user -> CCH), which may include the user's own CCH auth key. These are + * safe to log -- the user already knows their own credentials. + * + * The upstream PROVIDER API key (outboundKey) is injected by ProxyForwarder + * into a separate Headers object and is NEVER present in session.headers or + * ctx.responseHeaders, so no redaction is needed here. + */ function headersToRecord(headers: Headers): Record { const result: Record = {}; headers.forEach((value, key) => {