Skip to content

fix: 修复请求记录卡在【请求中】导致崩溃 (#854)#858

Open
tesgth032 wants to merge 45 commits intoding113:devfrom
tesgth032:fix/issue-854-message-request-stuck
Open

fix: 修复请求记录卡在【请求中】导致崩溃 (#854)#858
tesgth032 wants to merge 45 commits intoding113:devfrom
tesgth032:fix/issue-854-message-request-stuck

Conversation

@tesgth032
Copy link
Contributor

@tesgth032 tesgth032 commented Mar 2, 2026

更新(2026-03-03)

  • proxy-status:快照刷新失败时回退到过期缓存并延长 TTL,避免 dashboard 轮询期间 DB 抖动导致 500 风暴
  • orphan sweeper:覆盖 status_code 已落库但 duration_ms 仍为空的记录;不覆写既有 status_code/error_message,duration_ms 保守填充为 ttfb_ms(或 0)
  • sealedCount:兼容 db.execute 返回 rowCount/rows 等不同形态
  • write buffer:costUsd 超出 [0, 1_000_000) 时节流告警并跳过
  • 新增单测:proxy-status stale cache 退化 + orphan sealer rowCount/SQL 覆盖 + costUsd 范围

背景

Issue #854:Dashboard 日志里 message_request 长期显示“请求中”,Tokens/成本/性能等字段为空或默认值,最终导致进程崩溃重启。

根因分析(可能叠加)

  1. 默认启用的 MESSAGE_REQUEST_WRITE_MODE=async 会把终态信息先写入内存队列再批量落库;当 DB 写入遇到数据/约束错误或暂态失败时,旧实现容易让队列卡住/丢失尾部更新,导致 duration_ms/status_code 长期为空。
  2. 进程被非优雅终止(OOM/SIGKILL)时,内存队列尾部更新无法落库,产生“孤儿请求”。
  3. proxy-status 追踪在异常情况下可能无限累积“活跃请求”明细,导致响应体膨胀与内存风险,进一步诱发 OOM -> 放大 (2)。

修复

  • 强化 message_request 异步批量写入:
    • 对 numeric/jsonb 字段做显式类型转换,并对 patch 做校验/清洗
    • 遇到数据类 DB 错误时自动降级为逐条写入(safe/minimal patch),避免卡死整条队列
    • 增加 maxPending 上限,溢出时优先丢弃非终态 patch,尽量保住 duration/status
  • 进程退出(SIGTERM/SIGINT)时尽力 stop/flush 写入队列,减少尾部丢失概率
  • 增加定时 sweeper:批量封闭超过阈值仍未落终态的“孤儿请求”,防止“请求中”无限累积
  • proxy-status 改为基于数据库聚合,并对活跃请求明细加 limit,避免异常情况下撑爆内存/响应体

针对 Review 的补强

  • proxy-status 的 lastRequest LATERAL 查询新增部分索引迁移(仅包含已完成请求),避免孤儿积累时回溯过深
  • 为 activeRequests/orphan sweeper 新增部分索引 idx_message_request_active_created_at_idduration_ms IS NULL),避免扫描退化
  • sealOrphanedMessageRequests 阈值改为基于 created_at,避免 updated_at 被非终态更新推迟封闭
  • updateMessageRequestDetailsrejected_invalid 直接 return,避免重复 sanitize/重复告警
  • token 相关字段改用 JS safe-int(匹配 bigint)sanitize,避免无声截断导致统计不一致
  • 批量 UPDATE 的 CASE 分支补齐 ::int/::bigint/::booleanNULL::jsonb cast,降低类型推断 DB 错误风险
  • flushBatchPerItem 丢弃 patch 时记录 isTerminal 并按终态提升日志级别,便于监控“终态丢失”风险
  • 抽取 env snapshot/restore 工具到 tests/helpers/env.ts,避免测试重复与漂移
  • 停止写缓冲时若仍有 pending,记录 error 便于排障

测试

  • bun run build
  • bun run lint
  • bun run lint:fix
  • bun run typecheck
  • bun run test

Closes #854

- message_request 异步批量写入:补充校验与显式类型转换,数据类错误时降级写入并加队列上限保护
- 进程退出时尽力 flush 写入队列;启动定时 sweeper 封闭历史孤儿记录
- proxy-status 改为基于数据库聚合并限制活跃请求返回量,避免异常累积撑爆内存
- 增加单元测试覆盖 write buffer 与 orphan sealing
@coderabbitai
Copy link

coderabbitai bot commented Mar 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

新增后台孤立 message_request 清扫器;强化消息写缓冲的补丁校验、溢出策略与分级回退写入;消息仓储新增批量封闭过期请求;活跃请求查询加入排序与上限保护;新增/更新相关单元与集成测试及 CI lint 安装步骤。

Changes

Cohort / File(s) Summary
孤立请求清扫器
src/instrumentation.ts
新增 __CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED____CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_INTERVAL_ID__;实现 startOrphanedMessageRequestSweeper()(动态导入消息仓库、每 60s 调用 sealOrphanedMessageRequests、首次立即运行、并发保护);在应用启动/关闭路径集成清扫器并增加日志与错误保护。
消息写缓冲:校验、溢出与降级写入
src/repository/message-write-buffer.ts
新增导出类型 MessageRequestUpdateEnqueueResult,使 enqueueMessageRequestUpdate 返回状态;大量补丁/字段 sanitize 工具;拒绝非法补丁并返回 rejected_invalid;溢出策略优先丢弃非终态,支持 dropped_overflow;批量写失败时按项降级重试(full→safe→minimal)、重排或丢弃,增强 JSON stringify 防护与重试/日志。
消息仓储:封闭孤立请求与写入回退
src/repository/message.ts
新增并导出 sealOrphanedMessageRequests(options?),实现对超时且仍为进行态的 message_request 批量更新(duration_ms、status_code、error_message)并返回 sealedCount;写入优先入队,回退路径提供同步 DB 写入。
活跃请求查询防护
src/lib/proxy-status-tracker.ts
loadActiveRequests 中添加 ORDER BY created_at DESC 与本地 limit = 1000,并在达到上限时记录警告以防无界结果。
测试与 CI 变更
tests/unit/repository/message-orphaned-requests.test.ts, tests/unit/repository/message-write-buffer.test.ts, tests/integration/usage-ledger.test.ts, .github/workflows/*
新增封闭孤立请求的单元测试并断言生成的 SQL/参数;扩展写缓冲相关测试(numeric 错误降级、非法补丁返回等);调整一处集成测试签名;在两个 workflow 中加入 Ensure Biome CLI 安装步骤。

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed 代码变更全面解决了 Issue #854 的核心需求:强化异步写入、优雅处理溢出、添加孤儿请求 sweeper、限制活跃请求查询,防止请求卡顿和进程崩溃。
Out of Scope Changes check ✅ Passed 所有变更均在 Issue #854 的修复范围内;workflow 文件的 Biome CLI 步骤和集成测试的超时调整属于必要支撑性修改,无超范围变更。
Description check ✅ Passed 拉取请求描述详细阐述了背景、根因分析、修复方案和测试步骤,与代码变更高度相关。
Title check ✅ Passed PR 标题准确总结了主要变更——修复请求记录卡在【请求中】导致的崩溃问题,与 PR 的核心目标和 Issue #854 直接对应,简洁清晰。

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在解决系统在处理消息请求时,因异步写入机制缺陷、进程非正常终止以及活跃请求追踪累积等问题导致的记录卡顿和系统崩溃。通过引入更健壮的数据写入策略、优雅的进程关闭流程、定期的“孤儿请求”清理以及优化的状态追踪,显著提升了系统的稳定性和数据一致性,避免了因异常数据导致的资源耗尽。

Highlights

  • 修复请求记录卡顿与崩溃问题: 解决了 请求记录显示异常:所有字段为空且状态卡在请求中,最终导致进程崩溃重启 #854 中描述的 Dashboard 请求记录长期卡在“请求中”状态,最终导致进程崩溃的问题。
  • 增强异步批量写入健壮性: 强化了 message_request 异步批量写入机制,包括对 numeric/jsonb 字段的显式类型转换、patch 校验与清洗,以及在遇到数据类 DB 错误时自动降级为逐条写入,避免队列卡死。
  • 引入“孤儿请求”清理器: 新增了定时任务(sweeper),用于批量封闭那些因进程非优雅终止等原因导致长时间未落终态的“孤儿请求”,防止其无限累积。
  • 优化进程优雅退出: 在进程接收到 SIGTERM/SIGINT 信号时,会尽力停止并刷新写入队列,减少尾部更新丢失的概率。
  • 改进 Proxy Status 追踪: 将 proxy-status 的活跃请求追踪改为基于数据库聚合,并对活跃请求明细增加了 limit 限制,以避免在异常情况下响应体膨胀和内存风险。
Changelog
  • src/instrumentation.ts
    • 增加了孤儿消息请求清理器的状态管理和启动/停止逻辑。
  • src/lib/proxy-status-tracker.ts
    • 限制了活跃请求查询的结果数量并增加了排序。
  • src/repository/message-write-buffer.ts
    • 引入了数据清理、错误处理和降级写入机制,以增强异步批量写入的健壮性。
  • src/repository/message.ts
    • 更新了消息请求更新函数以利用新的异步写入机制,并添加了清理孤儿请求的功能。
  • tests/unit/repository/message-orphaned-requests.test.ts
    • 添加了针对孤儿消息请求清理功能的单元测试。
  • tests/unit/repository/message-write-buffer.test.ts
    • 增加了测试用例,验证了数据类DB错误时的降级写入行为。
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions github-actions bot added bug Something isn't working area:core size/L Large PR (< 1000 lines) labels Mar 2, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

此 PR 为修复“请求卡在‘请求中’状态”的问题引入了一套非常可靠的解决方案。这些变更非常全面,包括:

  1. 新增“孤儿请求清理器”,用于清理陈旧的、卡住的请求。
  2. 在代理状态跟踪器中增加了防御性限制,以防止内存问题。
  3. 对异步写入缓冲区进行了大量改进,包括数据清洗和一套复杂的多级数据库写入回退机制。

实现质量很高,直接解决了分析出的根本原因。我有一个关于提高写入缓冲区中新错误处理逻辑可维护性的建议。总体而言,在增强系统稳定性方面做得非常出色。

Comment on lines +570 to +642
try {
await tryExecute(item.patch);
} catch (singleError) {
if (!isDataRelatedDbError(singleError)) {
// 连接/暂态问题:把当前及剩余条目回队列,留待下次 flush
this.requeueBatchForRetry(batch.slice(index));
logger.error(
"[MessageRequestWriteBuffer] Per-item flush hit transient error, will retry",
{
error:
singleError instanceof Error ? singleError.message : String(singleError),
errorCode: getErrorCode(singleError),
pending: this.pending.size,
}
);
shouldRetryLater = true;
break;
}

const safePatch = getSafePatch(item.patch);
try {
await tryExecute(safePatch);
} catch (safeError) {
if (!isDataRelatedDbError(safeError)) {
this.requeueBatchForRetry(batch.slice(index));
logger.error(
"[MessageRequestWriteBuffer] Per-item safe flush hit transient error, will retry",
{
error: safeError instanceof Error ? safeError.message : String(safeError),
errorCode: getErrorCode(safeError),
pending: this.pending.size,
}
);
shouldRetryLater = true;
break;
}

const minimalPatch = getMinimalPatch(item.patch);
try {
await tryExecute(minimalPatch);
} catch (minimalError) {
if (!isDataRelatedDbError(minimalError)) {
this.requeueBatchForRetry(batch.slice(index));
logger.error(
"[MessageRequestWriteBuffer] Per-item minimal flush hit transient error, will retry",
{
error:
minimalError instanceof Error
? minimalError.message
: String(minimalError),
errorCode: getErrorCode(minimalError),
pending: this.pending.size,
}
);
shouldRetryLater = true;
break;
}

// 数据持续异常:丢弃该条更新,避免拖死整个队列(后续由 sweeper 兜底封闭)
logger.error(
"[MessageRequestWriteBuffer] Dropping invalid update to unblock queue",
{
requestId: item.id,
error:
minimalError instanceof Error
? minimalError.message
: String(minimalError),
errorCode: getErrorCode(minimalError),
}
);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

处理数据相关数据库错误的回退逻辑非常健壮,这很棒。然而,使用嵌套 try-catch 块的实现导致了大量的代码重复,尤其是在处理瞬时错误时。这使得代码更难阅读和维护。

建议重构这部分代码,使用一个循环来遍历多种补丁生成策略(例如:完整、安全、最小化)。这将使代码结构扁平化,并消除用于重新入队和记录瞬时错误的重复代码。

基于循环的方法示例:

const patchStrategies = [
  { name: 'full', patch: item.patch },
  { name: 'safe', patch: getSafePatch(item.patch) },
  { name: 'minimal', patch: getMinimalPatch(item.patch) },
];

let updateSucceeded = false;
for (const { name, patch } of patchStrategies) {
  if (Object.keys(patch).length === 0) continue;

  try {
    await tryExecute(patch);
    updateSucceeded = true;
    break; // 成功
  } catch (error) {
    if (!isDataRelatedDbError(error)) {
      // 一次性处理瞬时错误并跳出两个循环
      this.requeueBatchForRetry(batch.slice(index));
      // ... 记录瞬时错误
      shouldRetryLater = true;
      break;
    }
    // 记录数据错误并继续下一个策略
  }
}

if (shouldRetryLater) {
  break; // 从主批处理项循环中跳出
}

if (!updateSucceeded) {
  // 记录更新已被丢弃
}

这种方法集中了错误处理,并使回退流程更加清晰。

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/repository/message-write-buffer.ts (1)

408-416: 建议让 enqueueMessageRequestUpdate 的返回值与“当前 patch 实际保留”语义严格一致。

Line 702 到 Line 713 目前固定在拿到 buffer 后返回 true,但 Line 409 到 Line 412(sanitize 后为空)或队列溢出丢弃场景下,当前 patch 可能并未真正保留。建议把 enqueue 改为返回是否已接受当前 patch,再透传给调用方。

建议补丁
-class MessageRequestWriteBuffer {
+class MessageRequestWriteBuffer {
@@
-  enqueue(id: number, patch: MessageRequestUpdatePatch): void {
+  enqueue(id: number, patch: MessageRequestUpdatePatch): boolean {
     const sanitized = sanitizePatch(patch);
     if (Object.keys(sanitized).length === 0) {
-      return;
+      return false;
     }

     const existing = this.pending.get(id) ?? {};
     this.pending.set(id, { ...existing, ...sanitized });
+    let accepted = true;
@@
       if (droppedId !== undefined) {
         this.pending.delete(droppedId);
+        if (droppedId === id) {
+          accepted = false;
+        }
         logger.warn("[MessageRequestWriteBuffer] Pending queue overflow, dropping update", {
@@
       this.flushAgainAfterCurrent = true;
-      return;
+      return accepted;
     }
@@
     if (this.pending.size >= this.config.batchSize) {
       void this.flush();
     }
+    return accepted;
   }
 }
@@
 export function enqueueMessageRequestUpdate(id: number, patch: MessageRequestUpdatePatch): boolean {
@@
   if (!buffer) {
     return false;
   }
-  buffer.enqueue(id, patch);
-  return true;
+  return buffer.enqueue(id, patch);
 }

Also applies to: 702-713

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/repository/message-write-buffer.ts` around lines 408 - 416, The enqueue
method currently drops empty or overflowed patches without signalling that to
callers; update enqueue(id: number, patch: MessageRequestUpdatePatch) to return
a boolean indicating whether the patch was accepted (true when sanitized
non-empty and stored into this.pending, false when sanitizePatch(...) yields
empty or the buffer/overflow logic discards it), and then update
enqueueMessageRequestUpdate to use that boolean instead of always returning true
so its return value reflects "patch actually retained"; reference
functions/variables: enqueue, enqueueMessageRequestUpdate, sanitizePatch, and
this.pending to locate and wire the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/repository/message.ts`:
- Around line 234-257: The UPDATE can overwrite rows that another concurrent
worker already finished because the WHERE only checks id IN (SELECT id FROM
candidates); update the UPDATE's WHERE to re-check the terminal conditions by
adding "duration_ms IS NULL" (and also "deleted_at IS NULL" to match the CTE) so
only still-unfinished, non-deleted rows from the message_request table are
updated; locate the SQL built in the query template (the CTE named candidates
and the UPDATE against message_request) and add these predicates to the WHERE
clause to provide the concurrency protection.

---

Nitpick comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 408-416: The enqueue method currently drops empty or overflowed
patches without signalling that to callers; update enqueue(id: number, patch:
MessageRequestUpdatePatch) to return a boolean indicating whether the patch was
accepted (true when sanitized non-empty and stored into this.pending, false when
sanitizePatch(...) yields empty or the buffer/overflow logic discards it), and
then update enqueueMessageRequestUpdate to use that boolean instead of always
returning true so its return value reflects "patch actually retained"; reference
functions/variables: enqueue, enqueueMessageRequestUpdate, sanitizePatch, and
this.pending to locate and wire the change.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

📥 Commits

Reviewing files that changed from the base of the PR and between 2769a75 and 1f60d23.

📒 Files selected for processing (6)
  • src/instrumentation.ts
  • src/lib/proxy-status-tracker.ts
  • src/repository/message-write-buffer.ts
  • src/repository/message.ts
  • tests/unit/repository/message-orphaned-requests.test.ts
  • tests/unit/repository/message-write-buffer.test.ts

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

This PR addresses issue #854 by fixing message_request records getting stuck in "requesting" state through robust error handling with graceful degradation, input sanitization, and a sweeper mechanism for orphaned records.

PR Size: L

  • Lines changed: 718 (686 additions, 32 deletions)
  • Files changed: 6

Issues Found

Category Critical High Medium Low
Logic/Bugs 0 0 0 0
Security 0 0 0 0
Error Handling 0 0 0 0
Types 0 0 0 0
Comments/Docs 0 0 0 0
Tests 0 0 1 0
Simplification 0 0 0 0

Medium Priority Issues (Should Consider)

  1. [src/lib/proxy-status-tracker.ts:169] [STANDARD-VIOLATION] Missing logging when query limit is hit.
    • The codebase has an established pattern in src/lib/availability/availability-service.ts:170-177 where query limits are logged when hit.
    • Without logging, operators cannot detect when the 1000-item limit is truncating results, which could mask underlying issues.
    • Suggested fix: Add logging after the query:
      if (rows.length === 1000) {
        logger.warn("[ProxyStatusTracker] Active requests query hit limit, results may be incomplete", {
          limit: 1000,
        });
      }

Review Coverage

  • Logic and correctness - Clean
  • Security (OWASP Top 10) - Clean
  • Error handling - Well implemented with graceful degradation
  • Type safety - Clean
  • Documentation accuracy - Comments accurately describe behavior
  • Test coverage - Adequate (core fallback logic tested)
  • Code clarity - Good

Automated review by Claude AI

isNull(providers.deletedAt)
)
);
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] [STANDARD-VIOLATION] Missing logging when query limit is hit

The codebase has an established pattern in src/lib/availability/availability-service.ts:170-177 where query limits are logged when hit. Without logging, operators cannot detect when the 1000-item limit is truncating results.

Suggested fix:

if (rows.length === 1000) {
  logger.warn("[ProxyStatusTracker] Active requests query hit limit, results may be incomplete", {
    limit: 1000,
  });
}

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 files reviewed, 26 comments

Edit Code Review Agent Settings | Greptile

}
};

await runOnce("startup");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial sweep on startup could delay application launch if many orphaned records exist or DB is slow (max 1000 records). Consider not awaiting this call or using void runOnce("startup") to run in background.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 174

Comment:
Initial sweep on startup could delay application launch if many orphaned records exist or DB is slow (max 1000 records). Consider not awaiting this call or using `void runOnce("startup")` to run in background.

How can I resolve this? If you propose a fix, please make it concise.

instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_INTERVAL_ID__ = undefined;
instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = false;
}
} catch (error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always exiting with code 0 could mask cleanup errors. Consider tracking cleanup status and exiting with non-zero code if critical cleanup failed.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 320

Comment:
Always exiting with code 0 could mask cleanup errors. Consider tracking cleanup status and exiting with non-zero code if critical cleanup failed.

How can I resolve this? If you propose a fix, please make it concise.

- enqueueMessageRequestUpdate 返回值严格反映 patch 是否被接受,避免 sanitize 为空时误判导致丢失更新
- sealOrphanedMessageRequests UPDATE 增加并发保护条件,避免覆盖已完成记录
- proxy-status 命中活跃请求 limit 时记录告警,便于运维发现截断
- orphan sweeper 启动改为后台执行并加入 inFlight 防重入
- 增加单元测试覆盖入队返回值语义
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 408-412: The enqueue method currently returns boolean that
overloads multiple meanings (sanitized-empty, buffer-unavailable,
overflow/drop), causing callers in src/repository/message.ts (calls at lines
~78, ~103, ~142) to incorrectly fall back to db.update for invalid patches;
change enqueue (and the related overflow handling at the other block around
lines ~708-718) to return a discriminated result type (e.g., enum or object with
kind: 'Ok' | 'RejectedInvalid' | 'BufferUnavailable' | 'DroppedOverflow')
instead of boolean, keep sanitizePatch usage the same but return
'RejectedInvalid' when sanitized is empty, return 'BufferUnavailable' for infra
issues, 'DroppedOverflow' for overflow; then update the callers in message.ts to
only trigger direct db.update fallback when the result is 'BufferUnavailable'
(or 'Ok' to proceed), and treat 'RejectedInvalid'/'DroppedOverflow' as discarded
(log if needed) rather than falling back to direct DB writes.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

📥 Commits

Reviewing files that changed from the base of the PR and between 1f60d23 and eb69162.

📒 Files selected for processing (5)
  • src/instrumentation.ts
  • src/lib/proxy-status-tracker.ts
  • src/repository/message-write-buffer.ts
  • src/repository/message.ts
  • tests/unit/repository/message-write-buffer.test.ts

- enqueueMessageRequestUpdate 返回区分结果(enqueued/rejected_invalid/buffer_unavailable/dropped_overflow)
- message.ts 仅在 buffer_unavailable 时才走同步写入,避免 invalid/overflow 场景误触发 db.update
- 更新单元测试覆盖 rejected_invalid 分支
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/repository/message-write-buffer.ts (1)

414-418: rejected_invalid 建议补一条低频日志,提升排障可观测性。

当前 invalid patch 会静默返回;建议在这里加 debug/warn(可采样)记录 id 与被过滤字段概况,便于追踪上游数据质量问题。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/repository/message-write-buffer.ts` around lines 414 - 418, In enqueue
(MessageWriteBuffer.enqueue) add a low-frequency/log-sampled debug or warn log
right before returning "rejected_invalid" when sanitized (result of
sanitizePatch) is empty: log the id and a brief summary of which fields were
filtered/removed (e.g., keys from the original patch vs sanitized) so upstream
data issues can be traced; use existing logger and a sampling mechanism or rate
limiter to avoid high-cardinality/noise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 258-260: 当前对可丢弃 patch 的判定只看 durationMs,导致仅含 statusCode 的终态 patch 在
overflow 淘汰时被误删;请将淘汰分支的判定改为复用 isTerminalPatch(patch)(即把原来的 durationMs ===
undefined / durationMs !== undefined 判断替换为 !isTerminalPatch(patch) 或
isTerminalPatch(patch) 的相应分支),确保任何含 statusCode 或 durationMs 的终态 patch
都不会被当作可丢弃项;同样把文件中另一处类似逻辑(当前报错指出的第二处)也做相同替换以保持一致。

In `@src/repository/message.ts`:
- Around line 227-229: In sealOrphanedMessageRequests, prevent NaN/float
propagation by validating and integer-normalizing inputs before using Math.max:
ensure options?.staleAfterMs and env.FETCH_BODY_TIMEOUT are Number.isFinite and
convert to integer milliseconds (e.g., Math.floor or parseInt) with fallbacks to
the safe default (60000), then compute staleAfterMs = Math.max(60000,
validatedStaleAfterMs + validatedFetchBodyTimeoutIfUsed) and limit = Math.max(1,
Math.floor(validated options?.limit ?? 1000)); finally build threshold = new
Date(Date.now() - staleAfterMs) so threshold is always a valid Date and LIMIT is
a positive integer. Reference: sealOrphanedMessageRequests, variables
staleAfterMs, limit, threshold, options, env.FETCH_BODY_TIMEOUT.

---

Nitpick comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 414-418: In enqueue (MessageWriteBuffer.enqueue) add a
low-frequency/log-sampled debug or warn log right before returning
"rejected_invalid" when sanitized (result of sanitizePatch) is empty: log the id
and a brief summary of which fields were filtered/removed (e.g., keys from the
original patch vs sanitized) so upstream data issues can be traced; use existing
logger and a sampling mechanism or rate limiter to avoid high-cardinality/noise.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

📥 Commits

Reviewing files that changed from the base of the PR and between eb69162 and b8be549.

📒 Files selected for processing (3)
  • src/repository/message-write-buffer.ts
  • src/repository/message.ts
  • tests/unit/repository/message-write-buffer.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/unit/repository/message-write-buffer.test.ts

Comment on lines 78 to 79
if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") {
return;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check loses updates when queue overflows. "dropped_overflow" returns early without sync fallback, causing data loss. Should be:

Suggested change
if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") {
return;
if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
return;
}

This ensures only successfully enqueued updates skip sync write. dropped_overflow and buffer_unavailable will fall through to sync write, preventing data loss. Same issue at lines 103 and 142.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 78-79

Comment:
This check loses updates when queue overflows. `"dropped_overflow"` returns early without sync fallback, causing data loss. Should be:

```suggestion
  if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
    return;
  }
```

This ensures only successfully enqueued updates skip sync write. `dropped_overflow` and `buffer_unavailable` will fall through to sync write, preventing data loss. Same issue at lines 103 and 142.

How can I resolve this? If you propose a fix, please make it concise.

- overflow 淘汰逻辑改用 isTerminalPatch,避免误删仅含 statusCode 的终态 patch
- rejected_invalid 增加节流告警日志,便于排查上游数据问题
- sealOrphanedMessageRequests 对 staleAfterMs/limit/FETCH_BODY_TIMEOUT 做 finite/int 归一化,避免 NaN 传播
- 调整单元测试覆盖非终态淘汰
Comment on lines 78 to 80
if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") {
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check loses updates when enqueueMessageRequestUpdate returns "dropped_overflow" or "rejected_invalid". Should only skip sync write when successfully enqueued.

Suggested change
if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") {
return;
}
if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
return;
}

This ensures sync fallback happens for "dropped_overflow", "rejected_invalid", and "buffer_unavailable", preventing data loss.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 78-80

Comment:
Check loses updates when `enqueueMessageRequestUpdate` returns `"dropped_overflow"` or `"rejected_invalid"`. Should only skip sync write when successfully enqueued.

```suggestion
  if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
    return;
  }
```

This ensures sync fallback happens for `"dropped_overflow"`, `"rejected_invalid"`, and `"buffer_unavailable"`, preventing data loss.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 103 to 105
if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) !== "buffer_unavailable") {
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same enqueue result check issue - should be === "enqueued" not !== "buffer_unavailable"

Suggested change
if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) !== "buffer_unavailable") {
return;
}
if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) === "enqueued") {
return;
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 103-105

Comment:
Same enqueue result check issue - should be `=== "enqueued"` not `!== "buffer_unavailable"`

```suggestion
  if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) === "enqueued") {
    return;
  }
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 142 to 144
if (enqueueMessageRequestUpdate(id, details) !== "buffer_unavailable") {
return;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same enqueue result check issue - should be === "enqueued" not !== "buffer_unavailable"

Suggested change
if (enqueueMessageRequestUpdate(id, details) !== "buffer_unavailable") {
return;
}
if (enqueueMessageRequestUpdate(id, details) === "enqueued") {
return;
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 142-144

Comment:
Same enqueue result check issue - should be `=== "enqueued"` not `!== "buffer_unavailable"`

```suggestion
  if (enqueueMessageRequestUpdate(id, details) === "enqueued") {
    return;
  }
```

How can I resolve this? If you propose a fix, please make it concise.

}

// 重要:注册 SIGTERM/SIGINT handler 会覆盖默认退出行为。
// 若不显式退出,进程会在“半关闭”状态继续运行(例如异步写入队列已停止),导致后续日志长期异常。
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exiting with code 0 even if cleanup failed could hide issues in monitoring/orchestration systems

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 334

Comment:
Exiting with code 0 even if cleanup failed could hide issues in monitoring/orchestration systems

How can I resolve this? If you propose a fix, please make it concise.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/repository/message.ts`:
- Around line 14-18: Replace the relative imports from "./message-write-buffer"
in src/repository/message.ts with the project path alias import using "@/..." so
the imported symbols (MessageRequestUpdatePatch, enqueueMessageRequestUpdate,
sanitizeMessageRequestUpdatePatch) are unchanged but imported from
"@/repository/message-write-buffer"; update both the type-only import for
MessageRequestUpdatePatch and the runtime import for enqueueMessageRequestUpdate
and sanitizeMessageRequestUpdatePatch to use the alias form.
- Around line 202-204: Replace the hardcoded user-facing string in
ORPHANED_ERROR_MESSAGE with a stable error code constant and ensure the code
populates the error_message/error_code field with that code (e.g., rename
ORPHANED_ERROR_MESSAGE → ORPHANED_ERROR_CODE or change its value to a
non-display token), update usages in the message repository where
ORPHANED_ERROR_MESSAGE and ORPHANED_STATUS_CODE are set so they write the code,
and add a note in the UI/display layer to map that code to localized text via
i18n (supporting zh-CN, zh-TW, en, ja, ru); keep the constant name
ORPHANED_STATUS_CODE as-is and only change the error-message constant semantics
to be a machine-readable code.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

📥 Commits

Reviewing files that changed from the base of the PR and between 99d95a1 and f8a9bf5.

📒 Files selected for processing (3)
  • src/repository/message-write-buffer.ts
  • src/repository/message.ts
  • tests/unit/repository/message-write-buffer.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/unit/repository/message-write-buffer.test.ts

updatedAt: new Date(),
// 非终态 patch 在 overflow 场景下丢弃即可,避免在压力峰值时反向放大 DB 写入。
// 终态(包含 statusCode)则尽量走同步写入,避免请求长期卡在“请求中”。
if (enqueueResult === "dropped_overflow" && details.statusCode === undefined) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminal patch check only looks at statusCode, but isTerminalPatch defines terminal as having either durationMs OR statusCode. If this function could receive patches with durationMs but no statusCode (via the MessageRequestUpdatePatch type that enqueueMessageRequestUpdate accepts), those would be dropped on overflow despite being terminal.

However, the function signature for details excludes durationMs, so this is type-safe. Consider adding a comment clarifying why only statusCode is checked here, or using isTerminalPatch helper for consistency.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 161

Comment:
Terminal patch check only looks at `statusCode`, but `isTerminalPatch` defines terminal as having either `durationMs` OR `statusCode`. If this function could receive patches with `durationMs` but no `statusCode` (via the `MessageRequestUpdatePatch` type that `enqueueMessageRequestUpdate` accepts), those would be dropped on overflow despite being terminal.

However, the function signature for `details` excludes `durationMs`, so this is type-safe. Consider adding a comment clarifying why only `statusCode` is checked here, or using `isTerminalPatch` helper for consistency.

How can I resolve this? If you propose a fix, please make it concise.

if (!this.stopping) {
this.ensureFlushTimer();
// 终态 patch 尽快落库,减少 duration/status 为空的“悬挂窗口”
this.ensureFlushTimer(isTerminalPatch(sanitized) ? 0 : undefined);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminal patches trigger immediate flush with delayMs: 0, which is good for reducing the "stuck in requesting" window. But if many terminal patches arrive in quick succession, this could cause excessive flush operations. Consider batching terminal patches with a very short delay (e.g., 10-50ms) instead of 0 to allow micro-batching while still being fast.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 485

Comment:
Terminal patches trigger immediate flush with `delayMs: 0`, which is good for reducing the "stuck in requesting" window. But if many terminal patches arrive in quick succession, this could cause excessive flush operations. Consider batching terminal patches with a very short delay (e.g., 10-50ms) instead of 0 to allow micro-batching while still being fast.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines 442 to 473
if (this.pending.size > this.config.maxPending) {
// 优先丢弃非“终态”更新(没有 durationMs 的条目),尽量保留请求完成信息
// 优先丢弃非“终态”更新(不含 durationMs/statusCode 的条目),尽量保留请求完成信息
let droppedId: number | undefined;
let droppedPatch: MessageRequestUpdatePatch | undefined;

for (const [candidateId, candidatePatch] of this.pending) {
if (candidatePatch.durationMs === undefined) {
if (!isTerminalPatch(candidatePatch)) {
droppedId = candidateId;
droppedPatch = candidatePatch;
break;
}
}

// 当 pending 全部为终态 patch 时,不应随机淘汰已有终态(会导致其他请求永久缺失完成信息)。
// 此时优先丢弃“当前” patch,并让调用方按返回值决定是否走同步写入兜底。
if (droppedId === undefined) {
const first = this.pending.entries().next().value as
| [number, MessageRequestUpdatePatch]
| undefined;
if (first) {
droppedId = first[0];
droppedPatch = first[1];
}
droppedId = id;
droppedPatch = this.pending.get(id);
}

if (droppedId !== undefined) {
this.pending.delete(droppedId);
if (droppedId === id) {
result = "dropped_overflow";
}
logger.warn("[MessageRequestWriteBuffer] Pending queue overflow, dropping update", {
maxPending: this.config.maxPending,
droppedId,
droppedHasDurationMs: droppedPatch?.durationMs !== undefined,
droppedIsTerminal: droppedPatch ? isTerminalPatch(droppedPatch) : undefined,
currentPending: this.pending.size,
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue overflow logic iterates through all pending items to find a non-terminal patch to drop. With maxPending of 1000+, this O(n) scan could add latency on the hot path during overflow scenarios. Consider maintaining separate queues for terminal vs non-terminal patches, or tracking non-terminal patch IDs in a separate Set for O(1) lookup.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 442-473

Comment:
Queue overflow logic iterates through all pending items to find a non-terminal patch to drop. With `maxPending` of 1000+, this O(n) scan could add latency on the hot path during overflow scenarios. Consider maintaining separate queues for terminal vs non-terminal patches, or tracking non-terminal patch IDs in a separate Set for O(1) lookup.

How can I resolve this? If you propose a fix, please make it concise.

}
};

void runOnce("startup");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Startup sweep uses void runOnce("startup") (fire-and-forget), so it won't block application launch. But if the sweep encounters errors or takes very long, there's no visibility or timeout. Consider adding a timeout wrapper or logging the completion time to detect issues.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 181

Comment:
Startup sweep uses `void runOnce("startup")` (fire-and-forget), so it won't block application launch. But if the sweep encounters errors or takes very long, there's no visibility or timeout. Consider adding a timeout wrapper or logging the completion time to detect issues.

How can I resolve this? If you propose a fix, please make it concise.

@tesgth032
Copy link
Contributor Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Mar 2, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Comment on lines +120 to +125
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") {
return;
}

await db
.update(messageRequest)
.set({
costUsd: formattedCost,
updatedAt: new Date(),
})
.where(eq(messageRequest.id, id));
await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

costUsd is non-terminal but writes to DB when dropped_overflow. This is inconsistent with updateMessageRequestDetails (line 161) which skips non-terminal writes on overflow to avoid amplifying DB load during peaks.

Suggested change
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") {
return;
}
await db
.update(messageRequest)
.set({
costUsd: formattedCost,
updatedAt: new Date(),
})
.where(eq(messageRequest.id, id));
await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost });
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") {
return;
}
// 非终态字段在队列溢出时跳过同步写入,避免在压力峰值时反向放大 DB 写入
if (enqueueResult === "dropped_overflow") {
return;
}
await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost });
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 120-125

Comment:
`costUsd` is non-terminal but writes to DB when `dropped_overflow`. This is inconsistent with `updateMessageRequestDetails` (line 161) which skips non-terminal writes on overflow to avoid amplifying DB load during peaks.

```suggestion
  const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
  if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") {
    return;
  }

  // 非终态字段在队列溢出时跳过同步写入,避免在压力峰值时反向放大 DB 写入
  if (enqueueResult === "dropped_overflow") {
    return;
  }

  await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost });
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +94 to +99
if (truncated < min) {
return min;
}
if (truncated > max) {
return max;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently clamping out-of-range values could hide data integrity bugs. Consider logging when clamping occurs, especially for critical fields like token counts or costs.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 94-99

Comment:
Silently clamping out-of-range values could hide data integrity bugs. Consider logging when clamping occurs, especially for critical fields like token counts or costs.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/repository/message.ts`:
- Around line 120-125: The function updateMessageRequestCost currently falls
back to a synchronous DB write when enqueueMessageRequestUpdate returns
"dropped_overflow", causing spikes in DB load; change the condition that
short-circuits before writeMessageRequestUpdateToDb to also return when
enqueueResult === "dropped_overflow" (i.e., treat overflow as a dropped
non-terminal update like in updateMessageRequestDetails) so that
writeMessageRequestUpdateToDb is only called when the enqueue result indicates a
non-dropped outcome.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

📥 Commits

Reviewing files that changed from the base of the PR and between f8a9bf5 and db55c41.

📒 Files selected for processing (1)
  • src/repository/message.ts

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (3)

src/lib/proxy-status-tracker.ts, line 176
Missing providers.deletedAt soft-delete filter

The original loadActiveRequests WHERE clause included isNull(providers.deletedAt) to exclude requests belonging to soft-deleted providers. This condition was dropped in this PR.

Because the table is joined with innerJoin(providers, ...), rows from soft-deleted providers are still present in the result set — the inner join only filters out unmatched rows, not deleted ones. Without the guard, active requests from deleted providers will now appear in the proxy status dashboard.

      .where(and(isNull(messageRequest.deletedAt), isNull(messageRequest.durationMs), isNull(providers.deletedAt)))

src/lib/proxy-status-tracker.ts, line 218
Missing providers.deleted_at filter in LATERAL subquery

The original loadLastRequests query explicitly joined providers with AND p.deleted_at IS NULL to filter out soft-deleted providers. The new LATERAL-based subquery omits this condition:

JOIN providers p ON mr.provider_id = p.id   -- missing: AND p.deleted_at IS NULL

As a result, the "last request" for a user can be populated with data from a soft-deleted provider (e.g., its name would still appear in the status response). The fix is to add the soft-delete guard:

JOIN providers p ON mr.provider_id = p.id AND p.deleted_at IS NULL

src/repository/message-write-buffer.ts, line 905
Silent item skip when buildBatchUpdateSql returns null for "full" strategy

When buildBatchUpdateSql([{id, patch}]) returns null for the "full" strategy, the loop breaks immediately without attempting the "safe" or "minimal" strategies. null here means the patch produced no SQL columns — which in practice should not happen for a patch that already passed the non-empty sanitization check in enqueue. But if it somehow does happen, the break means the item is silently discarded without ever attempting the minimal fallback (durationMs + statusCode + errorMessage).

Consider logging a warning on this path and using continue instead of break so the remaining strategies are still attempted:

if (!singleQuery) {
  // No SQL columns produced (all fields filtered out by buildBatchUpdateSql); skip this strategy.
  continue;
}

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (3)

src/repository/message.ts, line 138
Redundant pre-sanitization before enqueueMessageRequestUpdate

sanitizeMessageRequestUpdatePatch({ costUsd: formattedCost }) is called here to validate and early-return, but enqueueMessageRequestUpdate already calls sanitizePatch internally, and writeMessageRequestUpdateToDb (the sync fallback) also calls sanitizeMessageRequestUpdatePatch. The pre-check is thus duplicating logic that is already applied downstream.

The net effect is harmless today because sanitizeCostUsdString is deterministic and idempotent. However, this creates two separate validation gates that must be kept in sync manually — if the sanitization logic for costUsd ever changes, a developer must update both the pre-check and sanitizePatch. Consider removing the early pre-check and relying solely on the existing sanitization in enqueueMessageRequestUpdate / writeMessageRequestUpdateToDb, optionally adding a log there for the rejected_invalid case to preserve observability.

// Before (current):
const sanitizedCost = sanitizeMessageRequestUpdatePatch({ costUsd: formattedCost }).costUsd;
if (!sanitizedCost) {
  logger.warn("[MessageRepository] costUsd rejected by sanitize, dropping cost update", { ... });
  return;
}
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: sanitizedCost });
// Simpler alternative:
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
// rejected_invalid already logged/handled inside enqueueMessageRequestUpdate

tests/unit/repository/message-orphaned-requests.test.ts, line 1728
toSqlText helper duplicated between test files

This helper is identical to the one in tests/unit/repository/message-write-buffer.test.ts (modulo the new casing field that was also added there). Duplicating it means both copies must be kept in sync if the Drizzle query-building API changes.

Consider extracting it into a shared test utility, e.g. tests/helpers/drizzle.ts:

// tests/helpers/drizzle.ts
import { CasingCache } from "drizzle-orm/casing";

export function toSqlText(query: { toQuery: (config: any) => { sql: string; params: unknown[] } }) {
  return query.toQuery({
    casing: new CasingCache(),
    escapeName: (name: string) => `"${name}"`,
    escapeParam: (index: number) => `$${index}`,
    escapeString: (value: string) => `'${value}'`,
    paramStartIndex: { value: 1 },
  });
}

This also avoids the config: any typing issue surfacing in two places.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


src/lib/proxy-status-tracker.ts, line 376
loadLastRequests scans entire users table — potential performance regression

The rewritten query starts from FROM users u JOIN LATERAL (...) last ON true WHERE u.deleted_at IS NULL. PostgreSQL must iterate every non-deleted user in the outer loop before deciding which ones have matching message_request rows. For a deployment with many users but relatively few requesters, this is O(num_users) index scans — a regression versus the previous approach that started from and filtered the message_request table directly.

The old DISTINCT ON (mr.user_id) query admittedly performed a full sort on message_request, which is expensive for large tables. The LATERAL approach trades that cost for a per-user lookup that scales with the number of users rather than the number of requests. For deployments where user count is low this is fine, but for deployments with many users and sparse requests it could be considerably worse.

Consider filtering the outer users scan to only those who have ever made a request — e.g. by anchoring from message_request with a subquery, or by checking EXISTS on message_request. Please add a comment documenting the assumed scale at which this query is expected to stay performant.

@tesgth032
Copy link
Contributor Author

已根据 Greptile/CodeRabbit 等 AI review 做了收敛与优化:

  • message-write-buffer:per-item flush 在 buildBatchUpdateSql 返回 null 时不再直接中断,继续尝试 safe/minimal;drop 日志补充 types + sample,便于定位异常 patch。
  • message 仓库:costUsd 更新不再做多处预校验,统一依赖 enqueue/sync sanitize;当 sanitize 后为空会显式 warn,避免静默丢更新。
  • tests:抽取共享的 Drizzle toSqlText helper,减少重复。

本地已跑:bun run build / bun run lint / bun run typecheck / bun run test 均通过;GitHub Actions 全绿(Greptile Review 仍 pending,待其更新 summary)。

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (3)

src/instrumentation.ts, line 149
TOCTOU: started-flag set after async import, allows double-initialization

__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ is set only at the end of the try block, well after the await import("@/repository/message") suspension point. If two calls to startOrphanedMessageRequestSweeper() are in-flight at the same time (e.g. during Next.js hot-reload or a racing re-registration), both pass the initial guard, both await the dynamic import, both set up a setInterval, and the second call silently overwrites __CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_INTERVAL_ID__. The first interval is then leaked — it can never be stopped by the cleanup handler — and both sweepers run in parallel indefinitely.

The fix is the same pattern used in other timers in the codebase: mark the state as "started" before any await, so subsequent concurrent calls exit immediately.

async function startOrphanedMessageRequestSweeper(): Promise<void> {
  if (instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__) {
    return;
  }
  // Guard against concurrent calls racing through the await below
  instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = true;

  try {
    const { sealOrphanedMessageRequests } = await import("@/repository/message");
    // ... rest of setup ...
    // No second assignment of __CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ needed
  } catch (error) {
    // Reset flag on failure so a future call can retry
    instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = false;
    logger.warn(...);
  }
}

src/repository/message-write-buffer.ts, line 603
statusCode lower-bound of 0 admits an invalid sentinel value

HTTP status codes start at 100; 0 is not a valid status code and would appear in the Dashboard as-is, potentially causing confusion. Any caller that accidentally passes statusCode: 0 (e.g. a default-initialised number field, a falsy short-circuit) will silently be accepted and written to the database.

Consider raising the minimum to 100 to reject the common zero-value bug:

  const statusCode = sanitizeInt32(patch.statusCode, { field: "statusCode", min: 100, max: 999 });

src/repository/message-write-buffer.ts, line 1037
requeueBatchForRetry may leave nonTerminalIds out of sync after a requeue-then-trim cycle

requeueBatchForRetry calls trimPendingToMaxPending({ allowDropTerminal: false }) after reinserting the batch. When trimPendingToMaxPending iterates nonTerminalIds and encounters an id whose patch is now undefined in pending (stale entry), it calls this.nonTerminalIds.delete(candidateId) — but only during the search loop. If the while-loop body reaches the terminal-drop branch (allowDropTerminal: falsebreak) before cleaning all stale ids, those ghost ids remain in nonTerminalIds.

On the next regular enqueue, the overflow scan in trimPendingToMaxPending will iterate these stale entries, deleting them one-by-one only as they are encountered. In the worst case (many stale entries, small queue) this introduces unnecessary O(n) scans every enqueue until all ghosts are purged.

A defensive safeguard would be to also clean up stale nonTerminalIds entries at the point they are removed from pending:

this.pending.delete(droppedId);
this.nonTerminalIds.delete(droppedId); // already present ✓

This is already done, but consider adding a periodic or post-requeue nonTerminalIds reconciliation pass to guarantee correctness regardless of trim path taken.

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/lib/proxy-status-tracker.ts, line 233
LATERAL JOIN index gap — worst-case slow during orphan accumulation

The LATERAL subquery filters on mr.duration_ms IS NOT NULL (line 228) but none of the existing message_request indexes carry that predicate. The closest index is idx_message_request_user_query on (user_id, created_at) WHERE deleted_at IS NULL. PostgreSQL will therefore use that index and do an index scan in reverse created_at order, checking each row's heap tuple for duration_ms IS NOT NULL.

In the normal case (most requests are completed) this terminates immediately. But during an orphan accumulation episode — exactly the scenario this PR targets — a user's most recent rows all have duration_ms IS NULL, so PostgreSQL must scan backwards through every orphaned row before finding a completed one (or reaching the end). This makes the proxy-status endpoint slowest precisely when the system is already under stress.

A partial index would make each per-user lookup O(1):

CREATE INDEX idx_message_request_user_created_at_completed
  ON message_request (user_id, created_at DESC, id DESC)
  WHERE deleted_at IS NULL AND duration_ms IS NOT NULL;

The comment at line 200–201 already acknowledges scaling limits for this query; adding this index resolves both the note and the edge-case regression.


src/repository/message.ts, line 219
rejected_invalid falls through to a redundant DB write attempt

When enqueueMessageRequestUpdate returns "rejected_invalid" it means the entire patch collapsed to empty after sanitization. The function currently falls through to await writeMessageRequestUpdateToDb(id, details), which calls sanitizeMessageRequestUpdatePatch(details) a second time, gets the same empty result, and emits a second "[MessageRepository] Message request patch rejected" warning — duplicating the log already emitted inside enqueue().

No incorrect write reaches the DB (the helper bails on an empty sanitized patch), but the duplicate log can be confusing during post-incident analysis.

Consider adding an early return for rejected_invalid to match the explicit handling already present in updateMessageRequestCost:

if (enqueueResult.kind === "rejected_invalid") {
  // patch was entirely invalid; buffer already logged the detail
  return;
}

src/repository/message.ts, line 291
updated_at-based threshold allows non-terminal writes to delay orphan detection

updated_at < threshold is updated by every non-terminal write (updateMessageRequestCost, token-count updates, etc.). If those writes fire shortly before a process crash, the orphan won't be detectable until <last non-terminal update time> + staleAfterMs, which can extend the "stuck in requesting" window by another 11 minutes (default config) on top of the already-generous timeout.

For example:

  • Request starts at T=0
  • updateMessageRequestCost fires at T=9 min → updated_at = T+9 min
  • Process crashes at T=9.1 min
  • Orphan visible to sweeper at T+9 min + 11 min = T+20 min

Using created_at instead of updated_at would make the threshold purely a function of request age and eliminate this compounding. The downside is that legitimately slow requests (longer than staleAfterMs) would be sealed prematurely — but since staleAfterMs is already FETCH_BODY_TIMEOUT + 60 s, any request running longer than that would be a timeout candidate anyway.

This is a design trade-off worth documenting explicitly, or optionally clamping the delay by also checking created_at:

AND (updated_at < ${threshold} OR created_at < ${createdAtThreshold})

src/repository/message-write-buffer.ts, line 1094
stop() flushes twice but ignores errors from either attempt

stop() calls flush() twice (line 1089, then conditionally line 1092) to drain the tail, but neither call's error is awaited with a check; any exception from flush() would be silently swallowed here (they are caught inside flush() via flushInFlight.finally). The _bufferState is set to "stopped" regardless of whether those flushes succeeded, which means callers (e.g. the SIGTERM handler) have no way to know whether terminal patches were persisted.

This is an existing limitation acknowledged by the comment "尽力 flush", but it would benefit from a return value or event indicating flush outcome, especially for the shutdown path where orchestration systems rely on clean exit signals.

- 为 proxy-status lastRequest LATERAL 查询增加部分索引 + 迁移\n- orphan 封闭阈值改用 created_at,避免 updated_at 漂移延迟\n- rejected_invalid 直接返回,避免重复 sanitize/告警\n- stop 阶段剩余 pending 时记录 error 便于排障
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (3)

src/lib/proxy-status-tracker.ts, line 233
Consider LEFT JOIN LATERAL for semantic clarity

JOIN LATERAL ... ON true is an INNER JOIN — users with no completed requests are excluded from the result set entirely. This is handled correctly upstream because getAllUsersStatus iterates dbUsers independently (users not in lastMap correctly get lastRequest: null), but an inner join here is slightly surprising for readers of this query in isolation.

LEFT JOIN LATERAL ... ON true would make the intent explicit: "get the most recent completed request for each user, or NULL if none exists", which aligns better with the comment on line 119–120.

      LEFT JOIN LATERAL (

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


tests/unit/repository/message-orphaned-requests.test.ts, line 25
Duplicated test utility functions

snapshotEnv and restoreEnv (lines 7–24) are identical to the same helpers in tests/unit/repository/message-write-buffer.test.ts. Since tests/helpers/drizzle.ts was introduced in this PR as a shared test helper, these utilities should live there too to avoid drift between the two copies.

// tests/helpers/env.ts (suggested location)
export function snapshotEnv(keys: string[]): Record<string, string | undefined> { ... }
export function restoreEnv(snapshot: Record<string, string | undefined>): void { ... }

src/repository/message-write-buffer.ts, line 168
Silent data loss when all fallback strategies fail for a terminal patch

In flushBatchPerItem, when the fullsafeminimal strategy chain is exhausted, the item is silently dropped with an error log. For non-terminal patches this is acceptable, but for a terminal patch (one with durationMs/statusCode), dropping it permanently leaves that message_request record stuck in "requesting".

The minimal strategy already strips everything except durationMs, statusCode, and errorMessage, so in practice it should never fail with a data error after sanitizePatch has already validated those fields. However, there is no assertion or counter tracking how often this terminal-patch-drop code path is hit.

Consider adding a metric or a more prominent log entry when a terminal patch is dropped through this path, to make silent data loss visible in monitoring:

if (lastFailure) {
  const isTerminal = item.patch.durationMs !== undefined || item.patch.statusCode !== undefined;
  logger[isTerminal ? "error" : "warn"](
    "[MessageRequestWriteBuffer] Dropping invalid update to unblock queue",
    { ..., isTerminal },
  );
}

- 抽出 tests/helpers/env.ts 避免重复 env snapshot\n- drop invalid update 日志增加 isTerminal 且按终态提升严重级别\n- 补充注释说明 loadLastRequests 仅返回有完成记录的用户
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (5)

src/repository/message-write-buffer.ts, line 883
getMinimalPatch can return an empty object for non-terminal patches

If a patch contains only fields stripped by both getSafePatch and getMinimalPatch (e.g., only costUsd with no durationMs, statusCode, or errorMessage), getMinimalPatch returns {}. In flushBatchPerItem, buildBatchUpdateSql([{ id, patch: {} }]) returns null, which sets lastFailure with "No SQL columns produced" for all 3 strategies. The item is then silently dropped (at warn level since isTerminalPatch is false).

This means a non-terminal patch consisting only of costUsd (or other "stripped" fields) that fails the "full" and "safe" strategies with data errors will be completely lost — no sync fallback, no retry. While this is intentional for non-terminal data during overflow, it could silently discard partially accumulated cost/token data that was building up across multiple updates.

Consider logging the original patch fields that were irretrievably dropped so operators can assess the scope of data loss.


src/repository/message.ts, line 227
sanitizeMessageRequestUpdatePatch called twice for overflow-recovered patches

When enqueueResult.kind === "dropped_overflow", enqueueResult.patch is already the merged, sanitized patch from inside the buffer. Passing it through writeMessageRequestUpdateToDb causes a redundant second call to sanitizeMessageRequestUpdatePatch. While idempotent and harmless, this redundancy makes the data flow less obvious and adds minor overhead on the hot overflow path.

Consider a dedicated internal function or a flag to skip re-sanitization when the patch is already known to be clean:

// e.g., skip re-sanitize for already-clean overflow patches
await writeMessageRequestUpdateToDb(id, patch, { skipSanitize: true });

src/repository/message.ts, line 284
sealOrphanedMessageRequests SQL: EXCLUDE_WARMUP_CONDITION column reference requires exact table name

The raw SQL template interpolates ${EXCLUDE_WARMUP_CONDITION}, a Drizzle column reference that expands to "message_request"."blocked_by". The inline comment "不要给 message_request 起别名" is essential but only visible to developers reading this function.

If a future refactor aliases the message_request table (e.g., FROM message_request mr), this condition would silently produce invalid SQL (referencing a non-existent "message_request" table in the scope), causing a runtime error rather than a compile-time failure.

Consider extracting this constraint inline rather than relying on the Drizzle column reference:

AND (blocked_by IS NULL OR blocked_by <> 'warmup')

Or add a more prominent @throws comment/assertion explaining the naming dependency.


src/repository/message-write-buffer.ts, line 593
sanitizeCostUsdString: scientific notation inputs could bypass precision checks

The function validates cost values via NUMERIC_LIKE_RE and Number.isFinite(parsed), with a final range check parsed >= 1_000_000. However, inputs like "9.999999999999999e5" (≈ 999999.9999999999) parse as 999999.9999999999 in JS but may produce different precision in PostgreSQL's numeric(21, 15) representation due to floating-point representation.

More critically, the regex accepts inputs like "1e-999" (parses to 0 in JS, valid) and "1e-20" (valid). These edge cases are fine, but extremely small values (less than the smallest representable numeric(21,15)) could round to 0 in the database. This is probably acceptable but worth documenting.

The real concern is that the String(value) conversion for numeric inputs at line ~601 could produce scientific notation like "1.23e-10", which correctly passes validation. This is fine, just noting it passes through as-is to PostgreSQL, which handles it correctly.


src/lib/proxy-status-tracker.ts, line 200
loadLastRequests LATERAL join scales linearly with total user count

The new query scans FROM users u (all non-deleted users) and runs one LATERAL index scan per user to find their last completed request. For installations with thousands of users (especially multi-tenant deployments), the per-user LATERAL scan has O(N_users) query complexity. Even with the new partial index idx_message_request_user_created_at_id_completed, the planner must loop over every user record.

If many users have zero completed requests (e.g., new users, warmup-only traffic), the LATERAL subqueries still execute and produce empty results. The JOIN LATERAL ... ON true acts as an inner join so these rows are filtered, but the scans still happen.

The comment already acknowledges scaling at 1e4+ users. Consider at minimum adding an index on users(deleted_at, id) (if not already present) to make the outer scan faster, or consider scoping this query to only "recently active" users to bound the cost in busy installations.

- sealOrphanedMessageRequests 改用 raw SQL 内联 blocked_by 条件,避免列引用依赖表名\n- writeMessageRequestUpdateToDb 补充注释说明 sanitize 二次执行的防御性
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (2)

src/repository/message-write-buffer.ts, line 350
Token count fields silently capped at Int32 despite bigint schema

sanitizeInt32(..., { max: INT32_MAX }) clamps inputTokens, outputTokens, cacheCreationInputTokens, cacheReadInputTokens, cacheCreation5mInputTokens, and cacheCreation1hInputTokens to a maximum of 2,147,483,647. However, the Drizzle schema declares all six of these columns as bigint (see schema.ts lines 459–465), which can hold values up to ~9.2×10¹⁸.

A large multimodal request today can easily exceed INT32_MAX tokens (e.g. a 2M-token context window × multiple messages). Any such value will be silently clamped, causing stored token counts (and therefore cost calculations) to be permanently incorrect without any indication other than the throttled warning log.

Consider using a dedicated sanitizeBigInt helper that accepts values up to Number.MAX_SAFE_INTEGER (or 9007199254740991) — the largest integer that JavaScript can represent exactly — and clamping there rather than at INT32_MAX:

const BIGINT_JS_MAX = Number.MAX_SAFE_INTEGER; // 2^53 - 1

function sanitizeBigInt(
  value: unknown,
  options?: { min?: number; max?: number; field?: string }
): number | undefined {
  const numeric = toFiniteNumber(value);
  if (numeric === null) return undefined;
  const truncated = Math.trunc(numeric);
  const min = options?.min ?? 0;
  const max = options?.max ?? BIGINT_JS_MAX;
  // ... same clamp + log logic as sanitizeInt32
}

Apply this to inputTokens, outputTokens, cacheCreationInputTokens, cacheReadInputTokens, cacheCreation5mInputTokens, and cacheCreation1hInputTokens.


src/lib/proxy-status-tracker.ts, line 236
LATERAL join performs one index lookup per non-deleted user with no upper bound

The query iterates over every row in users WHERE deleted_at IS NULL and fires a LATERAL subquery for each. Even with the new idx_message_request_user_created_at_id_completed partial index (which makes each LATERAL cheap), the overall query cost is still O(n_users). In a deployment with, say, 2,000 non-deleted users — the majority of whom haven't used the system in months — this will still execute 2,000 index lookups every time the status page polls.

The comment acknowledges the 10,000+ threshold, but even a few thousand users could produce noticeable latency.

Consider narrowing the outer users scan to only users who have had at least one completed request in a recent window (e.g., the last 7 days), or adding a LIMIT to the outer query with pagination. Something like:

FROM users u
WHERE u.deleted_at IS NULL
  AND EXISTS (
    SELECT 1 FROM message_request mr
    WHERE mr.user_id = u.id
      AND mr.deleted_at IS NULL
      AND mr.duration_ms IS NOT NULL
      AND mr.created_at > NOW() - INTERVAL '7 days'
  )

This reduces the LATERAL invocations to only recently-active users, which is almost certainly what the Dashboard cares about anyway.

- 新增 sanitizeSafeInt,token 相关字段上限提升至 Number.MAX_SAFE_INTEGER\n- 补充单测:>INT32_MAX 不应被截断,超出 safe-int 时应 clamp
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/repository/message.ts, line 297
No index supports the orphan sweeper's full-table scan

The sealOrphanedMessageRequests CTE executes:

SELECT id FROM message_request
WHERE deleted_at IS NULL
  AND duration_ms IS NULL
  AND created_at < $threshold
  AND (blocked_by IS NULL OR blocked_by <> 'warmup')
ORDER BY created_at ASC
LIMIT $limit

The PR adds idx_message_request_user_created_at_id_completed (for duration_ms IS NOT NULL), but there's no partial index covering the orphaned case (duration_ms IS NULL). Existing indexes like idx_message_request_user_query are per-user (user_id, created_at), so they can't efficiently satisfy a table-wide scan ordered only by created_at.

As orphaned records accumulate — precisely the scenario this PR is guarding against — each sweeper invocation (every 60 s) will fan out to an increasingly expensive sequential scan on the message_request table. This could counteract the OOM-prevention goal.

Consider adding a partial index:

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_message_request_orphaned_created_at
  ON message_request (created_at ASC)
  WHERE deleted_at IS NULL
    AND duration_ms IS NULL
    AND (blocked_by IS NULL OR blocked_by <> 'warmup');

src/lib/proxy-status-tracker.ts, line 196
loadActiveRequests query has no partial index on duration_ms IS NULL

The query scans the entire message_request table for rows where durationMs IS NULL:

.where(and(isNull(messageRequest.deletedAt), isNull(messageRequest.durationMs)))
.orderBy(desc(messageRequest.createdAt))
.limit(limit)

The PR adds a LIMIT 1000 cap to prevent response bloat (good), but without a partial index on (created_at DESC) WHERE deleted_at IS NULL AND duration_ms IS NULL, PostgreSQL must evaluate the full deleted_at IS NULL index and then filter, sort, and cut. The new idx_message_request_user_created_at_id_completed index only covers duration_ms IS NOT NULL, so it's unused here.

In a system that accumulates orphaned records — the exact failure mode this PR addresses — this query degrades as orphan count grows. Since getAllUsersStatus calls both loadActiveRequests and loadLastRequests in parallel, its latency becomes bounded by whichever takes longer.

Consider adding an index that mirrors the new sweeper index idea:

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_message_request_active
  ON message_request (created_at DESC)
  WHERE deleted_at IS NULL
    AND duration_ms IS NULL
    AND (blocked_by IS NULL OR blocked_by <> 'warmup');

src/repository/message-write-buffer.ts, line 629
NULL CASE branch for jsonb columns omits explicit ::jsonb cast

When providerChain or specialSettings is null, the generated SQL is:

CASE id WHEN 42 THEN NULL ELSE "provider_chain" END

PostgreSQL's type resolution for CASE expressions uses the union of all branch types. The ELSE "provider_chain" (type jsonb) and THEN NULL (type unknown) combination is resolved to jsonb, so the query works in practice. However, in edge cases where all values for a column in a batch are null (no non-null branches), the type of the CASE expression degrades to unknown and can fail with an ambiguous type error.

Since the PR already applies explicit ::jsonb for non-null JSON values, applying it to NULL as well would be consistent and defensive:

          cases.push(sql`WHEN ${update.id} THEN NULL::jsonb`);

src/repository/message-write-buffer.ts, line 653
No explicit type casts for integer/bigint columns in CASE expressions

The PR's root-cause analysis notes that implicit type inference caused data/constraint DB errors on numeric and jsonb columns, and the fix includes explicit ::numeric / ::jsonb casts. However, the CASE branches for integer columns (status_code, duration_ms, provider_id, ttfb_ms) and bigint columns (input_tokens, output_tokens, and cache token counts) pass raw JavaScript numbers without explicit casts:

cases.push(sql`WHEN ${update.id} THEN ${value}`);

PostgreSQL usually infers the correct type from the ELSE column branch, but if an unexpected data type slips past sanitizePatch (e.g., a bigint JS primitive vs a number), this can produce a 22P02 / 42804 error. The existing explicit-cast precedent (::numeric, ::jsonb) suggests these should also be cast:

  • Integer columns: ::int for status_code, duration_ms, ttfb_ms, provider_id
  • Bigint columns: ::bigint for input_tokens, output_tokens, cache_*_input_tokens
  • Boolean columns: ::boolean for context_1m_applied, swap_cache_ttl_applied

This would make the SQL consistent with the stated fix strategy and prevent the same data error recurrence on non-numeric columns.

- 新增 idx_message_request_active_created_at_id(duration_ms IS NULL)支持 proxy-status 与 orphan sweeper\n- buildBatchUpdateSql:NULL::jsonb + int/bigint/boolean 显式 cast,降低隐式推断风险\n- 生成并修正 drizzle 迁移为 IF NOT EXISTS(幂等)
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/lib/proxy-status-tracker.ts, line 183
Missing warmup filter in active requests query

loadActiveRequests does not exclude warmup requests (blocked_by = 'warmup'), but both loadLastRequests (line 230) and sealOrphanedMessageRequests (line 270 and 295) explicitly filter them out. Warmup requests that never set duration_ms can permanently accumulate in the active list, inflating activeCount and cluttering the dashboard with internal probes — exactly the "无限累积活跃请求" problem this PR is meant to prevent.

    const rows = await db
      .select({
        requestId: messageRequest.id,
        userId: messageRequest.userId,
        keyString: messageRequest.key,
        keyName: keys.name,
        providerId: messageRequest.providerId,
        providerName: providers.name,
        model: messageRequest.model,
        createdAt: messageRequest.createdAt,
      })
      .from(messageRequest)
      .leftJoin(
        providers,
        and(eq(messageRequest.providerId, providers.id), isNull(providers.deletedAt))
      )
      .leftJoin(keys, and(eq(keys.key, messageRequest.key), isNull(keys.deletedAt)))
      .where(
        and(
          isNull(messageRequest.deletedAt),
          isNull(messageRequest.durationMs),
          sql`(${messageRequest.blockedBy} IS NULL OR ${messageRequest.blockedBy} <> 'warmup')`
        )
      )
      // 防御:异常情况下 durationMs 长期为空会导致"活跃请求"无限累积,进而撑爆查询与响应体。
      // 这里对返回明细做上限保护(监控用途不需要无穷列表)。
      .orderBy(desc(messageRequest.createdAt))
      .limit(limit);

src/repository/message-write-buffer.ts, line 1044
Silent "drop" when all strategies produce no SQL columns

When the full and safe strategies fail with data errors, getMinimalPatch may also return {} (if the patch carries only non-terminal fields like inputTokens/outputTokens that are excluded by the minimal strategy). In that case buildBatchUpdateSql returns null, lastFailure is set to a synthetic "No SQL columns produced" error, and the item falls through to the drop log — but no actual SQL failure occurred; the data was simply not terminal enough to survive the fallback chain.

The drop log at line 1064 ("Dropping invalid update to unblock queue") is then misleading: it looks like an error when the real situation is "non-terminal fields couldn't be written because no fallback covers them." Consider distinguishing this case to aid monitoring and diagnosis.


src/repository/message.ts, line 288
COALESCE(duration_ms, …) is redundant given the WHERE guard

The WHERE … AND duration_ms IS NULL re-check on the UPDATE target (line 293) guarantees that the rows being updated always have duration_ms IS NULL at execution time. COALESCE(duration_ms, computed_value) therefore always reduces to computed_value — the COALESCE wrapper provides no additional safety and may obscure the intent for future readers.

      duration_ms = LEAST(
        2147483647,
        GREATEST(0, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000))
      )::int,

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


src/repository/message-write-buffer.ts, line 800
enqueue can trigger both a manual flush() and a timer flush for the same batch

When pending.size >= batchSize (line 797), void this.flush() is called. But ensureFlushTimer was already called at line 793 for terminal patches with TERMINAL_FLUSH_DELAY_MS = 10 ms, and at line 793 for non-terminal patches with config.flushIntervalMs. When pending.size >= batchSize AND the patch is terminal, both the direct void this.flush() call AND a 10 ms timer fire. The timer's flush() will be a no-op (it sees flushInFlight and sets flushAgainAfterCurrent), so the behaviour is correct — but the timer is spuriously rescheduled and adds 10 ms of latency for batches that could have been flushed immediately.

Consider short-circuiting the timer when pending.size >= batchSize:

// 达到批量阈值时尽快 flush,降低 durationMs 为空的"悬挂时间"
if (this.pending.size >= this.config.batchSize) {
  this.clearFlushTimer(); // cancel the just-scheduled timer
  void this.flush();
}

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (5)

drizzle/0078_huge_weapon_omega.sql, line 1
Blocking index creation will lock message_request table during migration

Both 0078 and 0079 use CREATE INDEX IF NOT EXISTS without CONCURRENTLY. In PostgreSQL, a regular CREATE INDEX acquires a ShareLock on the table, which blocks all INSERT, UPDATE, and DELETE operations for the duration of the build. On a production table with millions of message_request rows, this can take tens of seconds to minutes and will cause write timeouts for live request recording.

The same issue affects drizzle/0079_special_zarda.sql.

To avoid locking, use CREATE INDEX CONCURRENTLY:

CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_message_request_user_created_at_id_completed"
  ON "message_request" USING btree ("user_id","created_at" DESC NULLS LAST,"id" DESC NULLS LAST)
  WHERE "message_request"."deleted_at" IS NULL
    AND "message_request"."duration_ms" IS NOT NULL
    AND ("message_request"."blocked_by" IS NULL OR "message_request"."blocked_by" <> 'warmup');

Note that CREATE INDEX CONCURRENTLY cannot run inside a transaction block, so the migration file must not be wrapped in a transaction (Drizzle supports this via a $DO $$ workaround or a dedicated non-transactional migration).


src/lib/proxy-status-tracker.ts, line 230
loadLastRequests LATERAL query scans ALL non-deleted users unconditionally

The query starts from FROM users u WHERE u.deleted_at IS NULL with no pagination or limit. For each user it does one LATERAL index scan (fast individually), but the total cost scales linearly with the number of active users.

Every call to getAllUsersStatus() (which is presumably polled from the dashboard) will execute this full cross-user sweep. For a deployment with thousands of users (even most of whom are inactive), this results in:

  • A full sequential scan or index scan of the users table
  • N LATERAL sub-queries, one per user

The comment in the code acknowledges this (若未来用户规模显著增大(例如 1e4+),建议为该接口增加分页), but there's no near-term mitigation. Consider adding a short TTL cache (e.g., 5-10 seconds) on the result of getAllUsersStatus() to avoid hitting the DB on every dashboard poll, which would significantly reduce the load without visible UX impact.


src/repository/message.ts, line 295
sealOrphanedMessageRequests may seal records that already have a valid status_code with an inflated duration_ms

The WHERE clause selects any record where duration_ms IS NULL, regardless of whether status_code is already set. This can happen legitimately when:

  1. updateMessageRequestDetails succeeded and wrote status_code to the DB (via async batch)
  2. The duration_ms batch write was lost (process crash before flush)

For those records, after sealing:

  • status_code is preserved correctly via COALESCE
  • duration_ms is set to LEAST(2147483647, GREATEST(0, EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000))::int

This computes the time from request creation to when the sweeper runs — potentially many minutes after the actual request completed. A record with status_code = 200 (fast success) could end up with duration_ms of, say, 300,000 ms (5 minutes), significantly inflating P99/average duration statistics in dashboards.

Consider adding a heuristic guard — for example, if status_code IS NOT NULL and ttfb_ms IS NOT NULL, use ttfb_ms as a reasonable upper bound for duration, or leave duration_ms at a sentinel value (e.g., -1 or NULL with a separate is_orphaned boolean flag) to exclude these records from performance statistics.


src/repository/message-write-buffer.ts, line 810
nonTerminalIds can become stale after requeueBatchForRetry

In flush(), IDs are removed from nonTerminalIds before the db.execute call:

for (const item of batch) {
  this.nonTerminalIds.delete(item.id);  // removed here
}
// ...
} catch (error) {
  this.requeueBatchForRetry(batch);  // IDs re-added to pending, but not to nonTerminalIds
}

If requeueBatchForRetry puts non-terminal items back into pending without re-adding their IDs to nonTerminalIds, those items will no longer be eligible for preferential dropping during overflow. They will be treated as "terminal-protected" even though they lack durationMs/statusCode.

This is a conservative fallback (protecting data), but it means the overflow logic may preserve stale non-terminal records and instead drop genuinely terminal ones. If pruneNonTerminalIds() is not called after requeue, consider adding a call to re-sync the set from the current pending map state inside requeueBatchForRetry.


tests/unit/repository/message-orphaned-requests.test.ts, line 69
Test assertion for created_at < count is too lenient

expect((built.sql.match(/created_at </g) ?? []).length).toBeGreaterThanOrEqual(2);

This asserts at least 2 occurrences of created_at <, but the design intent is exactly 2 (one in the CTE candidates selection, one in the UPDATE WHERE clause to prevent TOCTOU races). Using toBe(2) instead of toBeGreaterThanOrEqual(2) would more precisely guard against accidental removal of either safety check:

    expect((built.sql.match(/created_at </g) ?? []).length).toBe(2);

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/repository/message.ts, line 279
Orphan sweeper query won't use the new partial index

The sealOrphanedMessageRequests CTE filters on status_code IS NULL, but the new partial index idx_message_request_active_created_at_id (added in 0079_special_zarda.sql) has predicate WHERE duration_ms IS NULL. PostgreSQL cannot use a partial index whose predicate doesn't appear in the query's WHERE clause, so the sweeper will fall back to a sequential scan — exactly the performance regression it was meant to prevent as orphans accumulate.

The fix is to add AND duration_ms IS NULL alongside status_code IS NULL in both the CTE and the final UPDATE guard. This also makes the semantics tighter (a true orphan has both fields NULL):

WITH candidates AS (
  SELECT id
  FROM message_request
  WHERE deleted_at IS NULL
    AND duration_ms IS NULL   -- allows idx_message_request_active_created_at_id to be used
    AND status_code IS NULL
    AND created_at < ${threshold}
    AND (blocked_by IS NULL OR blocked_by <> 'warmup')
  ORDER BY created_at ASC
  LIMIT ${limit}
)
UPDATE message_request
SET ...
WHERE id IN (SELECT id FROM candidates)
  AND deleted_at IS NULL
  AND duration_ms IS NULL
  AND status_code IS NULL
  ...

COALESCE(duration_ms, ...) in the SET clause already handles the case where duration_ms was set but status_code wasn't, so no data would be lost by making the filter stricter.


src/lib/proxy-status-tracker.ts, line 143
Cache can be bypassed in a narrow concurrent window

statusSnapshotInFlight.finally() clears this.statusSnapshotInFlight = null before the outer await line resumes to set this.statusSnapshotCache = snapshot. Between these two points any concurrent coroutine will see:

  • statusSnapshotCache → stale/null
  • statusSnapshotInFlight → null

…and will create a new in-flight query, defeating the deduplication.

Move the cache write into the inner async function (before finally fires) to close the window:

this.statusSnapshotInFlight = (async () => {
  const [dbUsers, activeRequestRows, lastRequestRows] = await Promise.all([...]);
  const snapshot: StatusSnapshot = {
    expiresAt: Date.now() + PROXY_STATUS_SNAPSHOT_TTL_MS,
    dbUsers: dbUsers as unknown as DbUserRow[],
    activeRequestRows,
    lastRequestRows,
  };
  this.statusSnapshotCache = snapshot; // set before finally clears inflight
  return snapshot;
})().finally(() => {
  this.statusSnapshotInFlight = null;
});

return await this.statusSnapshotInFlight;

src/repository/message-write-buffer.ts, line 1080
Misleading "Dropping update" log when build throws for one strategy but others produce no SQL

When a subset of strategies throw from buildBatchUpdateSql while the rest produce null (no writable columns), lastFailure is set from the throwing strategy but noSqlColumnsStrategies is also partially populated. The !lastFailure guard makes the "no writable columns" debug path unreachable even though the net outcome is the same — no write was attempted. The if (lastFailure) block then fires at logger.error/logger.warn level and claims the update is being "dropped to unblock queue," which is misleading when the real cause is simply no writable columns.

Consider also checking whether all strategies that didn't throw produced no SQL (i.e., the failure was in a strategy that was never even attempted on the DB):

const allPatchesHadNoColumns =
  noSqlColumnsStrategies.length + buildFailureStrategies.length === patchStrategies.length &&
  noSqlColumnsStrategies.length > 0;

if (!lastFailure || allPatchesHadNoColumns) {
  logger.debug("[MessageRequestWriteBuffer] Skipping update with no writable columns", ...);
  continue;
}

(This requires tracking buildFailureStrategies similarly to noSqlColumnsStrategies.)


src/repository/message.ts, line 308
Array.from(result) is redundant

db.execute(query) already returns a QueryResult whose iteration produces the rows directly. Array.from(result) materialises a new array but sealed.length would equal result.rowCount (or result.rows.length), so this works — but for clarity and to avoid the extra allocation, consider using result.rowCount ?? result.rows?.length ?? 0 or simply spreading into an array only if you need the elements elsewhere.

const result = await db.execute(query);
return { sealedCount: result.rowCount ?? 0 };

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/repository/message-write-buffer.ts, line 804
Queue can temporarily reach 2× maxPending during sustained transient DB failures

In requeueBatchForRetry, when called with allowDropTerminal: false and the pending set is entirely terminal patches, trimPendingToMaxPending breaks without dropping anything. If this coincides with new patches being enqueued during the failed flush (which go directly into pending), the queue can reach up to batchSize + new_enqueues beyond maxPending.

Concrete scenario:

  1. maxPending = 1000, 1000 terminal patches taken out as a batch → pending.size = 0
  2. 1000 new terminal patches arrive during flush → pending.size = 1000
  3. Batch is requeued (DB transient failure) → pending.size = 2000
  4. trimPendingToMaxPending(allowDropTerminal: false) can't trim → warning logged but 2× overflow persists

The comment acknowledges this ("允许暂时超过 maxPending") but under a sustained DB outage combined with high traffic, the unbounded growth could contribute to the same OOM that this PR is trying to prevent. Consider enforcing a hard cap in the requeue path — for example, logging at error level and dropping the oldest terminal patches (with explicit warning that duration/status data is being lost) when size exceeds 2 × maxPending.


src/lib/proxy-status-tracker.ts, line 388
Records with status_code IS NOT NULL AND duration_ms IS NULL become invisible

The new loadActiveRequests now requires both duration_ms IS NULL AND status_code IS NULL (previously only duration_ms IS NULL was checked). This means requests that have statusCode written (e.g., via a partial batch flush of updateMessageRequestDetails) but are still missing durationMs will no longer appear as "active".

At the same time, sealOrphanedMessageRequests also requires both to be NULL — so these records are not sealed either. They enter a permanent limbo state: invisible to the active-request view, uncounted as completed, and silently skipped by the sweeper.

While the most common call-site writes statusCode and durationMs together, there are code paths where updateMessageRequestDetails could write statusCode alone. Under async batch flushing, partial-state DB rows are possible. These records would accumulate without any detection or resolution.

A simple mitigation is to widen the sweeper's condition to also seal records matching duration_ms IS NULL AND status_code IS NOT NULL AND created_at < threshold. Alternatively, add these to the sweeper's CTE without changing loadActiveRequests.


src/repository/message.ts, line 769
Redundant staleAfterMs floor silently ignores caller intent for testing

The Math.max(60_000, staleAfterMsCandidate) floor means callers cannot set a threshold shorter than 60 seconds, even in non-production contexts. The test works around this by relying on the clamp behavior (passing staleAfterMs: 10 and asserting the 60-second clamped result), but this makes the function harder to use in integration tests that need deterministic shorter thresholds.

Consider making the floor configurable or bypassed in test environments:

// or accept an options.unsafe_override_min_stale_ms for test purposes
const staleAfterMs = options?._unsafe_override_min ?
  staleAfterMsCandidate :
  Math.max(60_000, staleAfterMsCandidate);

Alternatively, document the 60-second minimum explicitly in the function's JSDoc so callers understand the constraint.


src/instrumentation.ts, line 78
Timed-out sealPromise continues running after inFlight = false, enabling concurrent sweeper runs

When the sweeper times out, inFlight is reset to false via the finally block, allowing the next scheduled interval to launch a new runOnce("scheduled"). However, the original sealPromise is still executing in the background (only its rejection is silenced). If the DB is consistently slow (e.g., 25–30 seconds per sweep), every interval eventually spawns a concurrent sweep once the timeout fires.

Multiple concurrent calls to sealOrphanedMessageRequests are safe from a data-integrity perspective (the double-check conditions in the CTE prevent double-sealing), but they add unnecessary concurrent DB load — exactly the condition that may have caused the original slowdown.

Consider tracking whether a timeout-orphaned promise is still running and skipping the next interval if so:

let orphanedPromise: Promise<unknown> | null = null;
if (result.timedOut) {
  orphanedPromise = sealPromise.catch(() => {});
  // ...
}
// At the start of runOnce:
if (orphanedPromise) {
  const settled = await Promise.race([orphanedPromise, Promise.resolve(Symbol('pending'))]);
  if (settled === Symbol('pending')) return; // still running
  orphanedPromise = null;
}

@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/lib/proxy-status-tracker.ts, line 140
No stale-cache fallback on DB error

getStatusSnapshot wraps all three DB queries (users, loadActiveRequests, loadLastRequests) in a single in-flight promise. If any of them throws, the rejection propagates to all concurrent callers of getAllUsersStatus, and statusSnapshotCache is left at the previous (already expired) value.

This means a single DB hiccup will cause every dashboard polling request to receive a 500 error simultaneously, rather than the previous (slightly stale) data.

Consider serving the stale cache as a degraded fallback instead of propagating the error:

this.statusSnapshotInFlight = (async () => {
  try {
    const [dbUsers, activeRequestRows, lastRequestRows] = await Promise.all([...]);
    const snapshot: StatusSnapshot = { ... };
    this.statusSnapshotCache = snapshot;
    return snapshot;
  } catch (error) {
    logger.error("[ProxyStatusTracker] Failed to refresh status snapshot", { error: ... });
    // Serve stale data if available; only throw when there is nothing to serve
    if (this.statusSnapshotCache) {
      return this.statusSnapshotCache;
    }
    throw error;
  }
})().finally(() => {
  this.statusSnapshotInFlight = null;
});

src/repository/message.ts, line 295
Partial-terminal records not swept by orphan sweeper

The sealOrphanedMessageRequests CTE and UPDATE both filter on duration_ms IS NULL AND status_code IS NULL. A record where only one of these two fields was successfully written to the DB (e.g., status_code written but duration_ms still NULL after a partial async flush) will never be swept.

These partially-terminal records won't appear as "active" in loadActiveRequests (which also requires both to be NULL), but they will appear in usage statistics and billing queries with a set status_code but a NULL duration_ms. Any downstream aggregation that assumes duration_ms is always non-null when status_code is set could silently produce incorrect statistics.

Consider adding a separate maintenance step or widening the sweeper condition to also handle records where duration_ms IS NULL AND status_code IS NOT NULL AND created_at < threshold:

-- Seal records where response arrived but duration wasn't persisted
UPDATE message_request
SET duration_ms = LEAST(2147483647, GREATEST(0, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000)::int)),
    updated_at  = NOW()
WHERE deleted_at IS NULL
  AND duration_ms IS NULL
  AND status_code IS NOT NULL
  AND created_at < ${threshold}
  AND (blocked_by IS NULL OR blocked_by <> 'warmup')

src/repository/message-write-buffer.ts, line 300
costUsd upper-bound rejection not logged at call site

sanitizeCostUsdString silently returns undefined when parsed >= 1_000_000 or parsed < 0. The caller sanitizePatch uses if (costUsd !== undefined) { sanitized.costUsd = costUsd; } — there is no warning when a valid-looking non-negative cost is dropped because it exceeds the schema ceiling.

A value like costUsd: 999999.9999999999 (just below the limit) is accepted, but 1000000 is silently dropped without any log entry. Given the hard-coded ceiling is tied to the numeric(21,15) schema column definition, it is easy for a future schema change to silently begin discarding data.

Consider emitting a throttled warning here similar to the existing clamp-log patterns:

if (parsed < 0 || parsed >= 1_000_000) {
  // throttled warn – same pattern as sanitizeInt32 / sanitizeSafeInt clamp warnings
  logger.warn("[MessageRequestWriteBuffer] costUsd out of accepted range, skipping", {
    value: trimmed.slice(0, 32),
    min: 0,
    max: 1_000_000,
  });
  return undefined;
}

src/repository/message.ts, line 302
db.execute result counting may be unreliable outside postgres.js

const sealedCount = Array.isArray(result) ? result.length : Array.from(result).length;

For postgres.js (likely the production driver) db.execute returns a RowList that extends Array, so Array.isArray is true and .length is correct.

However, if the driver is ever switched to node-postgres (pg) or @vercel/postgres, db.execute returns a QueryResult<T> object ({ rows: T[], rowCount: number, ... }). Array.isArray is then false, and Array.from(result) iterates the object's own enumerable keys (not the rows), producing the wrong count — likely 0 or a small number equal to the number of result object properties.

The RETURNING id clause already provides all the information needed. Consider accessing rows explicitly to be driver-agnostic:

const rows = Array.isArray(result) ? result : ((result as any).rows ?? Array.from(result));
const sealedCount = rows.length;

- ProxyStatusTracker: 刷新失败时返回过期缓存并延长 TTL,避免 dashboard 轮询错误风暴

- sealOrphanedMessageRequests: 覆盖 status_code 已写入但 duration_ms 缺失的记录,并兼容 rowCount 返回

- message-write-buffer: costUsd 超范围节流告警

- 补充单测覆盖
@greptile-apps
Copy link

greptile-apps bot commented Mar 3, 2026

Additional Comments (4)

src/repository/message.ts, line 282
duration_ms incorrectly set to 0 when status_code is known but ttfb_ms is null

When a request has status_code IS NOT NULL (response header arrived) but duration_ms IS NULL and ttfb_ms IS NULL (both were dropped from the async queue), the ELSE branch sets duration_ms = COALESCE(ttfb_ms, 0) = 0. This misrepresents the request as an "instant" 0 ms response in dashboards and statistics, which is worse than showing the elapsed wall-clock estimate.

A safer fallback uses NOW() - created_at in both branches (already used when status_code IS NULL):

duration_ms = CASE
  WHEN status_code IS NULL THEN (
    LEAST(
      2147483647,
      GREATEST(0, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000))
    )::int
  )
  ELSE (
    LEAST(
      2147483647,
      GREATEST(0, COALESCE(ttfb_ms, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000)))
    )::int
  )
END,

This uses ttfb_ms as the preferred estimate when status_code is set (TTFB ≤ real duration, so it's a lower-bound), but falls back to wall-clock elapsed time instead of 0 when ttfb_ms is also missing.


src/lib/proxy-status-tracker.ts, line 155
Stale cache extended by fixed TTL on every DB failure — no backoff

When getStatusSnapshot fails to refresh from the DB, the stale cache's expiresAt is extended by exactly PROXY_STATUS_SNAPSHOT_TTL_MS (5 s). This means every 5 seconds during a sustained DB outage the code attempts another refresh, logs a warning, and re-extends. There is no exponential back-off or jitter, so high-frequency dashboard polling during a DB failure keeps hammering the pool with doomed queries at exactly 5-second intervals.

Consider a simple doubling strategy, for example:

// Track consecutive failures to implement exponential backoff
private statusSnapshotRefreshFailures = 0;

// In the catch block:
const backoffMs = Math.min(
  PROXY_STATUS_SNAPSHOT_TTL_MS * Math.pow(2, this.statusSnapshotRefreshFailures),
  60_000
);
this.statusSnapshotRefreshFailures++;
this.statusSnapshotCache = {
  ...this.statusSnapshotCache,
  expiresAt: Date.now() + backoffMs,
};

// On successful refresh, reset:
this.statusSnapshotRefreshFailures = 0;

This bounds the retry rate without completely blocking status updates when the DB recovers.


src/repository/message-write-buffer.ts, line 615
No type cast for cacheTtlApplied = null in CASE expression — potential type-inference ambiguity

When cacheTtlApplied is null, the generated CASE branch is:

WHEN <id> THEN $n   -- $n = NULL, no cast

If every row in the batch has cacheTtlApplied = null (all $n are NULL), PostgreSQL cannot infer the type of the CASE expression from the branches alone. In most cases the column context in SET cache_ttl_applied = CASE … END is enough for PostgreSQL to resolve the type, but the explicit ::text cast used for other known-string fields would be safer and more consistent:

          cases.push(sql`WHEN ${update.id} THEN NULL::text`);

src/repository/message-write-buffer.ts, line 862
trimPendingToMaxPending may drop currentId's terminal patch even when non-terminal others exist

The function tries two loops to find a non-terminal candidate to drop before falling back to currentId. However, nonTerminalIds iteration order is insertion order. If currentId was the first non-terminal inserted, the second loop (which allows dropping currentId) will find it immediately and drop it, even if other non-terminal IDs were inserted later.

Concretely, if IDs [1, 2, 3, currentId] are all non-terminal and nonTerminalIds iterates as [1, 2, 3, currentId], the first loop (which skips currentId) correctly picks 1. This path is fine.

The problem arises in the second loop fallback: if currentId is the only non-terminal item, it is dropped even when allowDropTerminal = false would normally prevent dropping terminal items. This is intentional for enqueue overflow (the caller handles the returned dropped_overflow result), but the second duplicate loop that re-scans nonTerminalIds including currentId is redundant — the first loop already exhausted all options excluding currentId, so the only remaining non-terminal option is currentId. Removing the duplicate loop reduces confusion and eliminates a redundant O(n) scan:

// After first loop fails to find a non-currentId non-terminal:
if (droppedId === undefined) {
  // currentId is the only remaining non-terminal option
  const candidatePatch = currentId !== undefined ? this.pending.get(currentId) : undefined;
  if (currentId !== undefined && candidatePatch && !isTerminalPatch(candidatePatch)) {
    droppedId = currentId;
    droppedPatch = candidatePatch;
  }
}

@tesgth032 tesgth032 changed the title fix: 修复请求记录卡在【请求中】导致崩溃 (#854) [未完成] fix: 修复请求记录卡在【请求中】导致崩溃 (#854) Mar 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core bug Something isn't working size/L Large PR (< 1000 lines)

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

1 participant