Conversation
- message_request 异步批量写入:补充校验与显式类型转换,数据类错误时降级写入并加队列上限保护 - 进程退出时尽力 flush 写入队列;启动定时 sweeper 封闭历史孤儿记录 - proxy-status 改为基于数据库聚合并限制活跃请求返回量,避免异常累积撑爆内存 - 增加单元测试覆盖 write buffer 与 orphan sealing
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough新增后台孤立 message_request 清扫器;强化消息写缓冲的补丁校验、溢出策略与分级回退写入;消息仓储新增批量封闭过期请求;活跃请求查询加入排序与上限保护;新增/更新相关单元与集成测试及 CI lint 安装步骤。 Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! 此拉取请求旨在解决系统在处理消息请求时,因异步写入机制缺陷、进程非正常终止以及活跃请求追踪累积等问题导致的记录卡顿和系统崩溃。通过引入更健壮的数据写入策略、优雅的进程关闭流程、定期的“孤儿请求”清理以及优化的状态追踪,显著提升了系统的稳定性和数据一致性,避免了因异常数据导致的资源耗尽。 Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
| try { | ||
| await tryExecute(item.patch); | ||
| } catch (singleError) { | ||
| if (!isDataRelatedDbError(singleError)) { | ||
| // 连接/暂态问题:把当前及剩余条目回队列,留待下次 flush | ||
| this.requeueBatchForRetry(batch.slice(index)); | ||
| logger.error( | ||
| "[MessageRequestWriteBuffer] Per-item flush hit transient error, will retry", | ||
| { | ||
| error: | ||
| singleError instanceof Error ? singleError.message : String(singleError), | ||
| errorCode: getErrorCode(singleError), | ||
| pending: this.pending.size, | ||
| } | ||
| ); | ||
| shouldRetryLater = true; | ||
| break; | ||
| } | ||
|
|
||
| const safePatch = getSafePatch(item.patch); | ||
| try { | ||
| await tryExecute(safePatch); | ||
| } catch (safeError) { | ||
| if (!isDataRelatedDbError(safeError)) { | ||
| this.requeueBatchForRetry(batch.slice(index)); | ||
| logger.error( | ||
| "[MessageRequestWriteBuffer] Per-item safe flush hit transient error, will retry", | ||
| { | ||
| error: safeError instanceof Error ? safeError.message : String(safeError), | ||
| errorCode: getErrorCode(safeError), | ||
| pending: this.pending.size, | ||
| } | ||
| ); | ||
| shouldRetryLater = true; | ||
| break; | ||
| } | ||
|
|
||
| const minimalPatch = getMinimalPatch(item.patch); | ||
| try { | ||
| await tryExecute(minimalPatch); | ||
| } catch (minimalError) { | ||
| if (!isDataRelatedDbError(minimalError)) { | ||
| this.requeueBatchForRetry(batch.slice(index)); | ||
| logger.error( | ||
| "[MessageRequestWriteBuffer] Per-item minimal flush hit transient error, will retry", | ||
| { | ||
| error: | ||
| minimalError instanceof Error | ||
| ? minimalError.message | ||
| : String(minimalError), | ||
| errorCode: getErrorCode(minimalError), | ||
| pending: this.pending.size, | ||
| } | ||
| ); | ||
| shouldRetryLater = true; | ||
| break; | ||
| } | ||
|
|
||
| // 数据持续异常:丢弃该条更新,避免拖死整个队列(后续由 sweeper 兜底封闭) | ||
| logger.error( | ||
| "[MessageRequestWriteBuffer] Dropping invalid update to unblock queue", | ||
| { | ||
| requestId: item.id, | ||
| error: | ||
| minimalError instanceof Error | ||
| ? minimalError.message | ||
| : String(minimalError), | ||
| errorCode: getErrorCode(minimalError), | ||
| } | ||
| ); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
处理数据相关数据库错误的回退逻辑非常健壮,这很棒。然而,使用嵌套 try-catch 块的实现导致了大量的代码重复,尤其是在处理瞬时错误时。这使得代码更难阅读和维护。
建议重构这部分代码,使用一个循环来遍历多种补丁生成策略(例如:完整、安全、最小化)。这将使代码结构扁平化,并消除用于重新入队和记录瞬时错误的重复代码。
基于循环的方法示例:
const patchStrategies = [
{ name: 'full', patch: item.patch },
{ name: 'safe', patch: getSafePatch(item.patch) },
{ name: 'minimal', patch: getMinimalPatch(item.patch) },
];
let updateSucceeded = false;
for (const { name, patch } of patchStrategies) {
if (Object.keys(patch).length === 0) continue;
try {
await tryExecute(patch);
updateSucceeded = true;
break; // 成功
} catch (error) {
if (!isDataRelatedDbError(error)) {
// 一次性处理瞬时错误并跳出两个循环
this.requeueBatchForRetry(batch.slice(index));
// ... 记录瞬时错误
shouldRetryLater = true;
break;
}
// 记录数据错误并继续下一个策略
}
}
if (shouldRetryLater) {
break; // 从主批处理项循环中跳出
}
if (!updateSucceeded) {
// 记录更新已被丢弃
}这种方法集中了错误处理,并使回退流程更加清晰。
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/repository/message-write-buffer.ts (1)
408-416: 建议让enqueueMessageRequestUpdate的返回值与“当前 patch 实际保留”语义严格一致。Line 702 到 Line 713 目前固定在拿到 buffer 后返回
true,但 Line 409 到 Line 412(sanitize 后为空)或队列溢出丢弃场景下,当前 patch 可能并未真正保留。建议把enqueue改为返回是否已接受当前 patch,再透传给调用方。建议补丁
-class MessageRequestWriteBuffer { +class MessageRequestWriteBuffer { @@ - enqueue(id: number, patch: MessageRequestUpdatePatch): void { + enqueue(id: number, patch: MessageRequestUpdatePatch): boolean { const sanitized = sanitizePatch(patch); if (Object.keys(sanitized).length === 0) { - return; + return false; } const existing = this.pending.get(id) ?? {}; this.pending.set(id, { ...existing, ...sanitized }); + let accepted = true; @@ if (droppedId !== undefined) { this.pending.delete(droppedId); + if (droppedId === id) { + accepted = false; + } logger.warn("[MessageRequestWriteBuffer] Pending queue overflow, dropping update", { @@ this.flushAgainAfterCurrent = true; - return; + return accepted; } @@ if (this.pending.size >= this.config.batchSize) { void this.flush(); } + return accepted; } } @@ export function enqueueMessageRequestUpdate(id: number, patch: MessageRequestUpdatePatch): boolean { @@ if (!buffer) { return false; } - buffer.enqueue(id, patch); - return true; + return buffer.enqueue(id, patch); }Also applies to: 702-713
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/repository/message-write-buffer.ts` around lines 408 - 416, The enqueue method currently drops empty or overflowed patches without signalling that to callers; update enqueue(id: number, patch: MessageRequestUpdatePatch) to return a boolean indicating whether the patch was accepted (true when sanitized non-empty and stored into this.pending, false when sanitizePatch(...) yields empty or the buffer/overflow logic discards it), and then update enqueueMessageRequestUpdate to use that boolean instead of always returning true so its return value reflects "patch actually retained"; reference functions/variables: enqueue, enqueueMessageRequestUpdate, sanitizePatch, and this.pending to locate and wire the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/repository/message.ts`:
- Around line 234-257: The UPDATE can overwrite rows that another concurrent
worker already finished because the WHERE only checks id IN (SELECT id FROM
candidates); update the UPDATE's WHERE to re-check the terminal conditions by
adding "duration_ms IS NULL" (and also "deleted_at IS NULL" to match the CTE) so
only still-unfinished, non-deleted rows from the message_request table are
updated; locate the SQL built in the query template (the CTE named candidates
and the UPDATE against message_request) and add these predicates to the WHERE
clause to provide the concurrency protection.
---
Nitpick comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 408-416: The enqueue method currently drops empty or overflowed
patches without signalling that to callers; update enqueue(id: number, patch:
MessageRequestUpdatePatch) to return a boolean indicating whether the patch was
accepted (true when sanitized non-empty and stored into this.pending, false when
sanitizePatch(...) yields empty or the buffer/overflow logic discards it), and
then update enqueueMessageRequestUpdate to use that boolean instead of always
returning true so its return value reflects "patch actually retained"; reference
functions/variables: enqueue, enqueueMessageRequestUpdate, sanitizePatch, and
this.pending to locate and wire the change.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
📒 Files selected for processing (6)
src/instrumentation.tssrc/lib/proxy-status-tracker.tssrc/repository/message-write-buffer.tssrc/repository/message.tstests/unit/repository/message-orphaned-requests.test.tstests/unit/repository/message-write-buffer.test.ts
There was a problem hiding this comment.
Code Review Summary
This PR addresses issue #854 by fixing message_request records getting stuck in "requesting" state through robust error handling with graceful degradation, input sanitization, and a sweeper mechanism for orphaned records.
PR Size: L
- Lines changed: 718 (686 additions, 32 deletions)
- Files changed: 6
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 1 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Medium Priority Issues (Should Consider)
- [src/lib/proxy-status-tracker.ts:169]
[STANDARD-VIOLATION]Missing logging when query limit is hit.- The codebase has an established pattern in
src/lib/availability/availability-service.ts:170-177where query limits are logged when hit. - Without logging, operators cannot detect when the 1000-item limit is truncating results, which could mask underlying issues.
- Suggested fix: Add logging after the query:
if (rows.length === 1000) { logger.warn("[ProxyStatusTracker] Active requests query hit limit, results may be incomplete", { limit: 1000, }); }
- The codebase has an established pattern in
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Well implemented with graceful degradation
- Type safety - Clean
- Documentation accuracy - Comments accurately describe behavior
- Test coverage - Adequate (core fallback logic tested)
- Code clarity - Good
Automated review by Claude AI
| isNull(providers.deletedAt) | ||
| ) | ||
| ); | ||
| ) |
There was a problem hiding this comment.
[MEDIUM] [STANDARD-VIOLATION] Missing logging when query limit is hit
The codebase has an established pattern in src/lib/availability/availability-service.ts:170-177 where query limits are logged when hit. Without logging, operators cannot detect when the 1000-item limit is truncating results.
Suggested fix:
if (rows.length === 1000) {
logger.warn("[ProxyStatusTracker] Active requests query hit limit, results may be incomplete", {
limit: 1000,
});
}
src/instrumentation.ts
Outdated
| } | ||
| }; | ||
|
|
||
| await runOnce("startup"); |
There was a problem hiding this comment.
Initial sweep on startup could delay application launch if many orphaned records exist or DB is slow (max 1000 records). Consider not awaiting this call or using void runOnce("startup") to run in background.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 174
Comment:
Initial sweep on startup could delay application launch if many orphaned records exist or DB is slow (max 1000 records). Consider not awaiting this call or using `void runOnce("startup")` to run in background.
How can I resolve this? If you propose a fix, please make it concise.| instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_INTERVAL_ID__ = undefined; | ||
| instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = false; | ||
| } | ||
| } catch (error) { |
There was a problem hiding this comment.
Always exiting with code 0 could mask cleanup errors. Consider tracking cleanup status and exiting with non-zero code if critical cleanup failed.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 320
Comment:
Always exiting with code 0 could mask cleanup errors. Consider tracking cleanup status and exiting with non-zero code if critical cleanup failed.
How can I resolve this? If you propose a fix, please make it concise.- enqueueMessageRequestUpdate 返回值严格反映 patch 是否被接受,避免 sanitize 为空时误判导致丢失更新 - sealOrphanedMessageRequests UPDATE 增加并发保护条件,避免覆盖已完成记录 - proxy-status 命中活跃请求 limit 时记录告警,便于运维发现截断 - orphan sweeper 启动改为后台执行并加入 inFlight 防重入 - 增加单元测试覆盖入队返回值语义
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 408-412: The enqueue method currently returns boolean that
overloads multiple meanings (sanitized-empty, buffer-unavailable,
overflow/drop), causing callers in src/repository/message.ts (calls at lines
~78, ~103, ~142) to incorrectly fall back to db.update for invalid patches;
change enqueue (and the related overflow handling at the other block around
lines ~708-718) to return a discriminated result type (e.g., enum or object with
kind: 'Ok' | 'RejectedInvalid' | 'BufferUnavailable' | 'DroppedOverflow')
instead of boolean, keep sanitizePatch usage the same but return
'RejectedInvalid' when sanitized is empty, return 'BufferUnavailable' for infra
issues, 'DroppedOverflow' for overflow; then update the callers in message.ts to
only trigger direct db.update fallback when the result is 'BufferUnavailable'
(or 'Ok' to proceed), and treat 'RejectedInvalid'/'DroppedOverflow' as discarded
(log if needed) rather than falling back to direct DB writes.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
📒 Files selected for processing (5)
src/instrumentation.tssrc/lib/proxy-status-tracker.tssrc/repository/message-write-buffer.tssrc/repository/message.tstests/unit/repository/message-write-buffer.test.ts
- enqueueMessageRequestUpdate 返回区分结果(enqueued/rejected_invalid/buffer_unavailable/dropped_overflow) - message.ts 仅在 buffer_unavailable 时才走同步写入,避免 invalid/overflow 场景误触发 db.update - 更新单元测试覆盖 rejected_invalid 分支
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/repository/message-write-buffer.ts (1)
414-418:rejected_invalid建议补一条低频日志,提升排障可观测性。当前 invalid patch 会静默返回;建议在这里加
debug/warn(可采样)记录id与被过滤字段概况,便于追踪上游数据质量问题。🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/repository/message-write-buffer.ts` around lines 414 - 418, In enqueue (MessageWriteBuffer.enqueue) add a low-frequency/log-sampled debug or warn log right before returning "rejected_invalid" when sanitized (result of sanitizePatch) is empty: log the id and a brief summary of which fields were filtered/removed (e.g., keys from the original patch vs sanitized) so upstream data issues can be traced; use existing logger and a sampling mechanism or rate limiter to avoid high-cardinality/noise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 258-260: 当前对可丢弃 patch 的判定只看 durationMs,导致仅含 statusCode 的终态 patch 在
overflow 淘汰时被误删;请将淘汰分支的判定改为复用 isTerminalPatch(patch)(即把原来的 durationMs ===
undefined / durationMs !== undefined 判断替换为 !isTerminalPatch(patch) 或
isTerminalPatch(patch) 的相应分支),确保任何含 statusCode 或 durationMs 的终态 patch
都不会被当作可丢弃项;同样把文件中另一处类似逻辑(当前报错指出的第二处)也做相同替换以保持一致。
In `@src/repository/message.ts`:
- Around line 227-229: In sealOrphanedMessageRequests, prevent NaN/float
propagation by validating and integer-normalizing inputs before using Math.max:
ensure options?.staleAfterMs and env.FETCH_BODY_TIMEOUT are Number.isFinite and
convert to integer milliseconds (e.g., Math.floor or parseInt) with fallbacks to
the safe default (60000), then compute staleAfterMs = Math.max(60000,
validatedStaleAfterMs + validatedFetchBodyTimeoutIfUsed) and limit = Math.max(1,
Math.floor(validated options?.limit ?? 1000)); finally build threshold = new
Date(Date.now() - staleAfterMs) so threshold is always a valid Date and LIMIT is
a positive integer. Reference: sealOrphanedMessageRequests, variables
staleAfterMs, limit, threshold, options, env.FETCH_BODY_TIMEOUT.
---
Nitpick comments:
In `@src/repository/message-write-buffer.ts`:
- Around line 414-418: In enqueue (MessageWriteBuffer.enqueue) add a
low-frequency/log-sampled debug or warn log right before returning
"rejected_invalid" when sanitized (result of sanitizePatch) is empty: log the id
and a brief summary of which fields were filtered/removed (e.g., keys from the
original patch vs sanitized) so upstream data issues can be traced; use existing
logger and a sampling mechanism or rate limiter to avoid high-cardinality/noise.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
📒 Files selected for processing (3)
src/repository/message-write-buffer.tssrc/repository/message.tstests/unit/repository/message-write-buffer.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit/repository/message-write-buffer.test.ts
src/repository/message.ts
Outdated
| if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") { | ||
| return; |
There was a problem hiding this comment.
This check loses updates when queue overflows. "dropped_overflow" returns early without sync fallback, causing data loss. Should be:
| if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") { | |
| return; | |
| if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") { | |
| return; | |
| } |
This ensures only successfully enqueued updates skip sync write. dropped_overflow and buffer_unavailable will fall through to sync write, preventing data loss. Same issue at lines 103 and 142.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 78-79
Comment:
This check loses updates when queue overflows. `"dropped_overflow"` returns early without sync fallback, causing data loss. Should be:
```suggestion
if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
return;
}
```
This ensures only successfully enqueued updates skip sync write. `dropped_overflow` and `buffer_unavailable` will fall through to sync write, preventing data loss. Same issue at lines 103 and 142.
How can I resolve this? If you propose a fix, please make it concise.- overflow 淘汰逻辑改用 isTerminalPatch,避免误删仅含 statusCode 的终态 patch - rejected_invalid 增加节流告警日志,便于排查上游数据问题 - sealOrphanedMessageRequests 对 staleAfterMs/limit/FETCH_BODY_TIMEOUT 做 finite/int 归一化,避免 NaN 传播 - 调整单元测试覆盖非终态淘汰
src/repository/message.ts
Outdated
| if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Check loses updates when enqueueMessageRequestUpdate returns "dropped_overflow" or "rejected_invalid". Should only skip sync write when successfully enqueued.
| if (enqueueMessageRequestUpdate(id, { durationMs }) !== "buffer_unavailable") { | |
| return; | |
| } | |
| if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") { | |
| return; | |
| } |
This ensures sync fallback happens for "dropped_overflow", "rejected_invalid", and "buffer_unavailable", preventing data loss.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 78-80
Comment:
Check loses updates when `enqueueMessageRequestUpdate` returns `"dropped_overflow"` or `"rejected_invalid"`. Should only skip sync write when successfully enqueued.
```suggestion
if (enqueueMessageRequestUpdate(id, { durationMs }) === "enqueued") {
return;
}
```
This ensures sync fallback happens for `"dropped_overflow"`, `"rejected_invalid"`, and `"buffer_unavailable"`, preventing data loss.
How can I resolve this? If you propose a fix, please make it concise.
src/repository/message.ts
Outdated
| if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) !== "buffer_unavailable") { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Same enqueue result check issue - should be === "enqueued" not !== "buffer_unavailable"
| if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) !== "buffer_unavailable") { | |
| return; | |
| } | |
| if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) === "enqueued") { | |
| return; | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 103-105
Comment:
Same enqueue result check issue - should be `=== "enqueued"` not `!== "buffer_unavailable"`
```suggestion
if (enqueueMessageRequestUpdate(id, { costUsd: formattedCost }) === "enqueued") {
return;
}
```
How can I resolve this? If you propose a fix, please make it concise.
src/repository/message.ts
Outdated
| if (enqueueMessageRequestUpdate(id, details) !== "buffer_unavailable") { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Same enqueue result check issue - should be === "enqueued" not !== "buffer_unavailable"
| if (enqueueMessageRequestUpdate(id, details) !== "buffer_unavailable") { | |
| return; | |
| } | |
| if (enqueueMessageRequestUpdate(id, details) === "enqueued") { | |
| return; | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 142-144
Comment:
Same enqueue result check issue - should be `=== "enqueued"` not `!== "buffer_unavailable"`
```suggestion
if (enqueueMessageRequestUpdate(id, details) === "enqueued") {
return;
}
```
How can I resolve this? If you propose a fix, please make it concise.| } | ||
|
|
||
| // 重要:注册 SIGTERM/SIGINT handler 会覆盖默认退出行为。 | ||
| // 若不显式退出,进程会在“半关闭”状态继续运行(例如异步写入队列已停止),导致后续日志长期异常。 |
There was a problem hiding this comment.
Exiting with code 0 even if cleanup failed could hide issues in monitoring/orchestration systems
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 334
Comment:
Exiting with code 0 even if cleanup failed could hide issues in monitoring/orchestration systems
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/repository/message.ts`:
- Around line 14-18: Replace the relative imports from "./message-write-buffer"
in src/repository/message.ts with the project path alias import using "@/..." so
the imported symbols (MessageRequestUpdatePatch, enqueueMessageRequestUpdate,
sanitizeMessageRequestUpdatePatch) are unchanged but imported from
"@/repository/message-write-buffer"; update both the type-only import for
MessageRequestUpdatePatch and the runtime import for enqueueMessageRequestUpdate
and sanitizeMessageRequestUpdatePatch to use the alias form.
- Around line 202-204: Replace the hardcoded user-facing string in
ORPHANED_ERROR_MESSAGE with a stable error code constant and ensure the code
populates the error_message/error_code field with that code (e.g., rename
ORPHANED_ERROR_MESSAGE → ORPHANED_ERROR_CODE or change its value to a
non-display token), update usages in the message repository where
ORPHANED_ERROR_MESSAGE and ORPHANED_STATUS_CODE are set so they write the code,
and add a note in the UI/display layer to map that code to localized text via
i18n (supporting zh-CN, zh-TW, en, ja, ru); keep the constant name
ORPHANED_STATUS_CODE as-is and only change the error-message constant semantics
to be a machine-readable code.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
📒 Files selected for processing (3)
src/repository/message-write-buffer.tssrc/repository/message.tstests/unit/repository/message-write-buffer.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit/repository/message-write-buffer.test.ts
src/repository/message.ts
Outdated
| updatedAt: new Date(), | ||
| // 非终态 patch 在 overflow 场景下丢弃即可,避免在压力峰值时反向放大 DB 写入。 | ||
| // 终态(包含 statusCode)则尽量走同步写入,避免请求长期卡在“请求中”。 | ||
| if (enqueueResult === "dropped_overflow" && details.statusCode === undefined) { |
There was a problem hiding this comment.
Terminal patch check only looks at statusCode, but isTerminalPatch defines terminal as having either durationMs OR statusCode. If this function could receive patches with durationMs but no statusCode (via the MessageRequestUpdatePatch type that enqueueMessageRequestUpdate accepts), those would be dropped on overflow despite being terminal.
However, the function signature for details excludes durationMs, so this is type-safe. Consider adding a comment clarifying why only statusCode is checked here, or using isTerminalPatch helper for consistency.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 161
Comment:
Terminal patch check only looks at `statusCode`, but `isTerminalPatch` defines terminal as having either `durationMs` OR `statusCode`. If this function could receive patches with `durationMs` but no `statusCode` (via the `MessageRequestUpdatePatch` type that `enqueueMessageRequestUpdate` accepts), those would be dropped on overflow despite being terminal.
However, the function signature for `details` excludes `durationMs`, so this is type-safe. Consider adding a comment clarifying why only `statusCode` is checked here, or using `isTerminalPatch` helper for consistency.
How can I resolve this? If you propose a fix, please make it concise.| if (!this.stopping) { | ||
| this.ensureFlushTimer(); | ||
| // 终态 patch 尽快落库,减少 duration/status 为空的“悬挂窗口” | ||
| this.ensureFlushTimer(isTerminalPatch(sanitized) ? 0 : undefined); |
There was a problem hiding this comment.
Terminal patches trigger immediate flush with delayMs: 0, which is good for reducing the "stuck in requesting" window. But if many terminal patches arrive in quick succession, this could cause excessive flush operations. Consider batching terminal patches with a very short delay (e.g., 10-50ms) instead of 0 to allow micro-batching while still being fast.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 485
Comment:
Terminal patches trigger immediate flush with `delayMs: 0`, which is good for reducing the "stuck in requesting" window. But if many terminal patches arrive in quick succession, this could cause excessive flush operations. Consider batching terminal patches with a very short delay (e.g., 10-50ms) instead of 0 to allow micro-batching while still being fast.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| if (this.pending.size > this.config.maxPending) { | ||
| // 优先丢弃非“终态”更新(没有 durationMs 的条目),尽量保留请求完成信息 | ||
| // 优先丢弃非“终态”更新(不含 durationMs/statusCode 的条目),尽量保留请求完成信息 | ||
| let droppedId: number | undefined; | ||
| let droppedPatch: MessageRequestUpdatePatch | undefined; | ||
|
|
||
| for (const [candidateId, candidatePatch] of this.pending) { | ||
| if (candidatePatch.durationMs === undefined) { | ||
| if (!isTerminalPatch(candidatePatch)) { | ||
| droppedId = candidateId; | ||
| droppedPatch = candidatePatch; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // 当 pending 全部为终态 patch 时,不应随机淘汰已有终态(会导致其他请求永久缺失完成信息)。 | ||
| // 此时优先丢弃“当前” patch,并让调用方按返回值决定是否走同步写入兜底。 | ||
| if (droppedId === undefined) { | ||
| const first = this.pending.entries().next().value as | ||
| | [number, MessageRequestUpdatePatch] | ||
| | undefined; | ||
| if (first) { | ||
| droppedId = first[0]; | ||
| droppedPatch = first[1]; | ||
| } | ||
| droppedId = id; | ||
| droppedPatch = this.pending.get(id); | ||
| } | ||
|
|
||
| if (droppedId !== undefined) { | ||
| this.pending.delete(droppedId); | ||
| if (droppedId === id) { | ||
| result = "dropped_overflow"; | ||
| } | ||
| logger.warn("[MessageRequestWriteBuffer] Pending queue overflow, dropping update", { | ||
| maxPending: this.config.maxPending, | ||
| droppedId, | ||
| droppedHasDurationMs: droppedPatch?.durationMs !== undefined, | ||
| droppedIsTerminal: droppedPatch ? isTerminalPatch(droppedPatch) : undefined, | ||
| currentPending: this.pending.size, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Queue overflow logic iterates through all pending items to find a non-terminal patch to drop. With maxPending of 1000+, this O(n) scan could add latency on the hot path during overflow scenarios. Consider maintaining separate queues for terminal vs non-terminal patches, or tracking non-terminal patch IDs in a separate Set for O(1) lookup.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 442-473
Comment:
Queue overflow logic iterates through all pending items to find a non-terminal patch to drop. With `maxPending` of 1000+, this O(n) scan could add latency on the hot path during overflow scenarios. Consider maintaining separate queues for terminal vs non-terminal patches, or tracking non-terminal patch IDs in a separate Set for O(1) lookup.
How can I resolve this? If you propose a fix, please make it concise.| } | ||
| }; | ||
|
|
||
| void runOnce("startup"); |
There was a problem hiding this comment.
Startup sweep uses void runOnce("startup") (fire-and-forget), so it won't block application launch. But if the sweep encounters errors or takes very long, there's no visibility or timeout. Consider adding a timeout wrapper or logging the completion time to detect issues.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/instrumentation.ts
Line: 181
Comment:
Startup sweep uses `void runOnce("startup")` (fire-and-forget), so it won't block application launch. But if the sweep encounters errors or takes very long, there's no visibility or timeout. Consider adding a timeout wrapper or logging the completion time to detect issues.
How can I resolve this? If you propose a fix, please make it concise.|
@coderabbitai review |
✅ Actions performedReview triggered.
|
| const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost }); | ||
| if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") { | ||
| return; | ||
| } | ||
|
|
||
| await db | ||
| .update(messageRequest) | ||
| .set({ | ||
| costUsd: formattedCost, | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where(eq(messageRequest.id, id)); | ||
| await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost }); |
There was a problem hiding this comment.
costUsd is non-terminal but writes to DB when dropped_overflow. This is inconsistent with updateMessageRequestDetails (line 161) which skips non-terminal writes on overflow to avoid amplifying DB load during peaks.
| const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost }); | |
| if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") { | |
| return; | |
| } | |
| await db | |
| .update(messageRequest) | |
| .set({ | |
| costUsd: formattedCost, | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(messageRequest.id, id)); | |
| await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost }); | |
| const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost }); | |
| if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") { | |
| return; | |
| } | |
| // 非终态字段在队列溢出时跳过同步写入,避免在压力峰值时反向放大 DB 写入 | |
| if (enqueueResult === "dropped_overflow") { | |
| return; | |
| } | |
| await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost }); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message.ts
Line: 120-125
Comment:
`costUsd` is non-terminal but writes to DB when `dropped_overflow`. This is inconsistent with `updateMessageRequestDetails` (line 161) which skips non-terminal writes on overflow to avoid amplifying DB load during peaks.
```suggestion
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
if (enqueueResult === "enqueued" || enqueueResult === "rejected_invalid") {
return;
}
// 非终态字段在队列溢出时跳过同步写入,避免在压力峰值时反向放大 DB 写入
if (enqueueResult === "dropped_overflow") {
return;
}
await writeMessageRequestUpdateToDb(id, { costUsd: formattedCost });
```
How can I resolve this? If you propose a fix, please make it concise.| if (truncated < min) { | ||
| return min; | ||
| } | ||
| if (truncated > max) { | ||
| return max; | ||
| } |
There was a problem hiding this comment.
Silently clamping out-of-range values could hide data integrity bugs. Consider logging when clamping occurs, especially for critical fields like token counts or costs.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/repository/message-write-buffer.ts
Line: 94-99
Comment:
Silently clamping out-of-range values could hide data integrity bugs. Consider logging when clamping occurs, especially for critical fields like token counts or costs.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/repository/message.ts`:
- Around line 120-125: The function updateMessageRequestCost currently falls
back to a synchronous DB write when enqueueMessageRequestUpdate returns
"dropped_overflow", causing spikes in DB load; change the condition that
short-circuits before writeMessageRequestUpdateToDb to also return when
enqueueResult === "dropped_overflow" (i.e., treat overflow as a dropped
non-terminal update like in updateMessageRequestDetails) so that
writeMessageRequestUpdateToDb is only called when the enqueue result indicates a
non-dropped outcome.
Additional Comments (3)
The original Because the table is joined with
The original JOIN providers p ON mr.provider_id = p.id -- missing: AND p.deleted_at IS NULLAs a result, the "last request" for a user can be populated with data from a soft-deleted provider (e.g., its name would still appear in the status response). The fix is to add the soft-delete guard: JOIN providers p ON mr.provider_id = p.id AND p.deleted_at IS NULL
When Consider logging a warning on this path and using if (!singleQuery) {
// No SQL columns produced (all fields filtered out by buildBatchUpdateSql); skip this strategy.
continue;
} |
Additional Comments (3)
The net effect is harmless today because // Before (current):
const sanitizedCost = sanitizeMessageRequestUpdatePatch({ costUsd: formattedCost }).costUsd;
if (!sanitizedCost) {
logger.warn("[MessageRepository] costUsd rejected by sanitize, dropping cost update", { ... });
return;
}
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: sanitizedCost });// Simpler alternative:
const enqueueResult = enqueueMessageRequestUpdate(id, { costUsd: formattedCost });
// rejected_invalid already logged/handled inside enqueueMessageRequestUpdate
This helper is identical to the one in Consider extracting it into a shared test utility, e.g. // tests/helpers/drizzle.ts
import { CasingCache } from "drizzle-orm/casing";
export function toSqlText(query: { toQuery: (config: any) => { sql: string; params: unknown[] } }) {
return query.toQuery({
casing: new CasingCache(),
escapeName: (name: string) => `"${name}"`,
escapeParam: (index: number) => `$${index}`,
escapeString: (value: string) => `'${value}'`,
paramStartIndex: { value: 1 },
});
}This also avoids the Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
The rewritten query starts from The old Consider filtering the outer |
|
已根据 Greptile/CodeRabbit 等 AI review 做了收敛与优化:
本地已跑:bun run build / bun run lint / bun run typecheck / bun run test 均通过;GitHub Actions 全绿(Greptile Review 仍 pending,待其更新 summary)。 |
Additional Comments (3)
The fix is the same pattern used in other timers in the codebase: mark the state as "started" before any async function startOrphanedMessageRequestSweeper(): Promise<void> {
if (instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__) {
return;
}
// Guard against concurrent calls racing through the await below
instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = true;
try {
const { sealOrphanedMessageRequests } = await import("@/repository/message");
// ... rest of setup ...
// No second assignment of __CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ needed
} catch (error) {
// Reset flag on failure so a future call can retry
instrumentationState.__CCH_ORPHANED_MESSAGE_REQUEST_SWEEPER_STARTED__ = false;
logger.warn(...);
}
}
HTTP status codes start at 100; Consider raising the minimum to
On the next regular A defensive safeguard would be to also clean up stale this.pending.delete(droppedId);
this.nonTerminalIds.delete(droppedId); // already present ✓This is already done, but consider adding a periodic or post-requeue |
Additional Comments (4)
The LATERAL subquery filters on In the normal case (most requests are completed) this terminates immediately. But during an orphan accumulation episode — exactly the scenario this PR targets — a user's most recent rows all have A partial index would make each per-user lookup O(1): CREATE INDEX idx_message_request_user_created_at_completed
ON message_request (user_id, created_at DESC, id DESC)
WHERE deleted_at IS NULL AND duration_ms IS NOT NULL;The comment at line 200–201 already acknowledges scaling limits for this query; adding this index resolves both the note and the edge-case regression.
When No incorrect write reaches the DB (the helper bails on an empty sanitized patch), but the duplicate log can be confusing during post-incident analysis. Consider adding an early return for if (enqueueResult.kind === "rejected_invalid") {
// patch was entirely invalid; buffer already logged the detail
return;
}
For example:
Using This is a design trade-off worth documenting explicitly, or optionally clamping the delay by also checking AND (updated_at < ${threshold} OR created_at < ${createdAtThreshold})
This is an existing limitation acknowledged by the comment "尽力 flush", but it would benefit from a return value or event indicating flush outcome, especially for the shutdown path where orchestration systems rely on clean exit signals. |
- 为 proxy-status lastRequest LATERAL 查询增加部分索引 + 迁移\n- orphan 封闭阈值改用 created_at,避免 updated_at 漂移延迟\n- rejected_invalid 直接返回,避免重复 sanitize/告警\n- stop 阶段剩余 pending 时记录 error 便于排障
Additional Comments (3)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
// tests/helpers/env.ts (suggested location)
export function snapshotEnv(keys: string[]): Record<string, string | undefined> { ... }
export function restoreEnv(snapshot: Record<string, string | undefined>): void { ... }
In The minimal strategy already strips everything except Consider adding a metric or a more prominent log entry when a terminal patch is dropped through this path, to make silent data loss visible in monitoring: if (lastFailure) {
const isTerminal = item.patch.durationMs !== undefined || item.patch.statusCode !== undefined;
logger[isTerminal ? "error" : "warn"](
"[MessageRequestWriteBuffer] Dropping invalid update to unblock queue",
{ ..., isTerminal },
);
} |
- 抽出 tests/helpers/env.ts 避免重复 env snapshot\n- drop invalid update 日志增加 isTerminal 且按终态提升严重级别\n- 补充注释说明 loadLastRequests 仅返回有完成记录的用户
Additional Comments (5)
If a patch contains only fields stripped by both This means a non-terminal patch consisting only of Consider logging the original patch fields that were irretrievably dropped so operators can assess the scope of data loss.
When Consider a dedicated internal function or a flag to skip re-sanitization when the patch is already known to be clean: // e.g., skip re-sanitize for already-clean overflow patches
await writeMessageRequestUpdateToDb(id, patch, { skipSanitize: true });
The raw SQL template interpolates If a future refactor aliases the Consider extracting this constraint inline rather than relying on the Drizzle column reference: AND (blocked_by IS NULL OR blocked_by <> 'warmup')Or add a more prominent
The function validates cost values via More critically, the regex accepts inputs like The real concern is that the
The new query scans If many users have zero completed requests (e.g., new users, warmup-only traffic), the LATERAL subqueries still execute and produce empty results. The The comment already acknowledges scaling at |
- sealOrphanedMessageRequests 改用 raw SQL 内联 blocked_by 条件,避免列引用依赖表名\n- writeMessageRequestUpdateToDb 补充注释说明 sanitize 二次执行的防御性
Additional Comments (2)
A large multimodal request today can easily exceed INT32_MAX tokens (e.g. a 2M-token context window × multiple messages). Any such value will be silently clamped, causing stored token counts (and therefore cost calculations) to be permanently incorrect without any indication other than the throttled warning log. Consider using a dedicated const BIGINT_JS_MAX = Number.MAX_SAFE_INTEGER; // 2^53 - 1
function sanitizeBigInt(
value: unknown,
options?: { min?: number; max?: number; field?: string }
): number | undefined {
const numeric = toFiniteNumber(value);
if (numeric === null) return undefined;
const truncated = Math.trunc(numeric);
const min = options?.min ?? 0;
const max = options?.max ?? BIGINT_JS_MAX;
// ... same clamp + log logic as sanitizeInt32
}Apply this to
The query iterates over every row in The comment acknowledges the 10,000+ threshold, but even a few thousand users could produce noticeable latency. Consider narrowing the outer FROM users u
WHERE u.deleted_at IS NULL
AND EXISTS (
SELECT 1 FROM message_request mr
WHERE mr.user_id = u.id
AND mr.deleted_at IS NULL
AND mr.duration_ms IS NOT NULL
AND mr.created_at > NOW() - INTERVAL '7 days'
)This reduces the LATERAL invocations to only recently-active users, which is almost certainly what the Dashboard cares about anyway. |
- 新增 sanitizeSafeInt,token 相关字段上限提升至 Number.MAX_SAFE_INTEGER\n- 补充单测:>INT32_MAX 不应被截断,超出 safe-int 时应 clamp
Additional Comments (4)
The SELECT id FROM message_request
WHERE deleted_at IS NULL
AND duration_ms IS NULL
AND created_at < $threshold
AND (blocked_by IS NULL OR blocked_by <> 'warmup')
ORDER BY created_at ASC
LIMIT $limitThe PR adds As orphaned records accumulate — precisely the scenario this PR is guarding against — each sweeper invocation (every 60 s) will fan out to an increasingly expensive sequential scan on the Consider adding a partial index: CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_message_request_orphaned_created_at
ON message_request (created_at ASC)
WHERE deleted_at IS NULL
AND duration_ms IS NULL
AND (blocked_by IS NULL OR blocked_by <> 'warmup');
The query scans the entire .where(and(isNull(messageRequest.deletedAt), isNull(messageRequest.durationMs)))
.orderBy(desc(messageRequest.createdAt))
.limit(limit)The PR adds a In a system that accumulates orphaned records — the exact failure mode this PR addresses — this query degrades as orphan count grows. Since Consider adding an index that mirrors the new sweeper index idea: CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_message_request_active
ON message_request (created_at DESC)
WHERE deleted_at IS NULL
AND duration_ms IS NULL
AND (blocked_by IS NULL OR blocked_by <> 'warmup');
When CASE id WHEN 42 THEN NULL ELSE "provider_chain" ENDPostgreSQL's type resolution for Since the PR already applies explicit
The PR's root-cause analysis notes that implicit type inference caused data/constraint DB errors on cases.push(sql`WHEN ${update.id} THEN ${value}`);PostgreSQL usually infers the correct type from the
This would make the SQL consistent with the stated fix strategy and prevent the same data error recurrence on non-numeric columns. |
- 新增 idx_message_request_active_created_at_id(duration_ms IS NULL)支持 proxy-status 与 orphan sweeper\n- buildBatchUpdateSql:NULL::jsonb + int/bigint/boolean 显式 cast,降低隐式推断风险\n- 生成并修正 drizzle 迁移为 IF NOT EXISTS(幂等)
Additional Comments (4)
When the full and safe strategies fail with data errors, The drop log at line 1064 (
The Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
When Consider short-circuiting the timer when // 达到批量阈值时尽快 flush,降低 durationMs 为空的"悬挂时间"
if (this.pending.size >= this.config.batchSize) {
this.clearFlushTimer(); // cancel the just-scheduled timer
void this.flush();
} |
Additional Comments (5)
Both The same issue affects To avoid locking, use CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_message_request_user_created_at_id_completed"
ON "message_request" USING btree ("user_id","created_at" DESC NULLS LAST,"id" DESC NULLS LAST)
WHERE "message_request"."deleted_at" IS NULL
AND "message_request"."duration_ms" IS NOT NULL
AND ("message_request"."blocked_by" IS NULL OR "message_request"."blocked_by" <> 'warmup');Note that
The query starts from Every call to
The comment in the code acknowledges this (
The WHERE clause selects any record where
For those records, after sealing:
This computes the time from request creation to when the sweeper runs — potentially many minutes after the actual request completed. A record with Consider adding a heuristic guard — for example, if
In for (const item of batch) {
this.nonTerminalIds.delete(item.id); // removed here
}
// ...
} catch (error) {
this.requeueBatchForRetry(batch); // IDs re-added to pending, but not to nonTerminalIds
}If This is a conservative fallback (protecting data), but it means the overflow logic may preserve stale non-terminal records and instead drop genuinely terminal ones. If
expect((built.sql.match(/created_at </g) ?? []).length).toBeGreaterThanOrEqual(2);This asserts at least 2 occurrences of |
Additional Comments (4)
The The fix is to add WITH candidates AS (
SELECT id
FROM message_request
WHERE deleted_at IS NULL
AND duration_ms IS NULL -- allows idx_message_request_active_created_at_id to be used
AND status_code IS NULL
AND created_at < ${threshold}
AND (blocked_by IS NULL OR blocked_by <> 'warmup')
ORDER BY created_at ASC
LIMIT ${limit}
)
UPDATE message_request
SET ...
WHERE id IN (SELECT id FROM candidates)
AND deleted_at IS NULL
AND duration_ms IS NULL
AND status_code IS NULL
...
…and will create a new in-flight query, defeating the deduplication. Move the cache write into the inner async function (before this.statusSnapshotInFlight = (async () => {
const [dbUsers, activeRequestRows, lastRequestRows] = await Promise.all([...]);
const snapshot: StatusSnapshot = {
expiresAt: Date.now() + PROXY_STATUS_SNAPSHOT_TTL_MS,
dbUsers: dbUsers as unknown as DbUserRow[],
activeRequestRows,
lastRequestRows,
};
this.statusSnapshotCache = snapshot; // set before finally clears inflight
return snapshot;
})().finally(() => {
this.statusSnapshotInFlight = null;
});
return await this.statusSnapshotInFlight;
When a subset of strategies throw from Consider also checking whether all strategies that didn't throw produced no SQL (i.e., the failure was in a strategy that was never even attempted on the DB): const allPatchesHadNoColumns =
noSqlColumnsStrategies.length + buildFailureStrategies.length === patchStrategies.length &&
noSqlColumnsStrategies.length > 0;
if (!lastFailure || allPatchesHadNoColumns) {
logger.debug("[MessageRequestWriteBuffer] Skipping update with no writable columns", ...);
continue;
}(This requires tracking
const result = await db.execute(query);
return { sealedCount: result.rowCount ?? 0 };Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
Additional Comments (4)
In Concrete scenario:
The comment acknowledges this ("允许暂时超过 maxPending") but under a sustained DB outage combined with high traffic, the unbounded growth could contribute to the same OOM that this PR is trying to prevent. Consider enforcing a hard cap in the requeue path — for example, logging at
The new At the same time, While the most common call-site writes A simple mitigation is to widen the sweeper's condition to also seal records matching
The Consider making the floor configurable or bypassed in test environments: // or accept an options.unsafe_override_min_stale_ms for test purposes
const staleAfterMs = options?._unsafe_override_min ?
staleAfterMsCandidate :
Math.max(60_000, staleAfterMsCandidate);Alternatively, document the 60-second minimum explicitly in the function's JSDoc so callers understand the constraint.
When the sweeper times out, Multiple concurrent calls to Consider tracking whether a timeout-orphaned promise is still running and skipping the next interval if so: let orphanedPromise: Promise<unknown> | null = null;
if (result.timedOut) {
orphanedPromise = sealPromise.catch(() => {});
// ...
}
// At the start of runOnce:
if (orphanedPromise) {
const settled = await Promise.race([orphanedPromise, Promise.resolve(Symbol('pending'))]);
if (settled === Symbol('pending')) return; // still running
orphanedPromise = null;
} |
Additional Comments (4)
This means a single DB hiccup will cause every dashboard polling request to receive a 500 error simultaneously, rather than the previous (slightly stale) data. Consider serving the stale cache as a degraded fallback instead of propagating the error: this.statusSnapshotInFlight = (async () => {
try {
const [dbUsers, activeRequestRows, lastRequestRows] = await Promise.all([...]);
const snapshot: StatusSnapshot = { ... };
this.statusSnapshotCache = snapshot;
return snapshot;
} catch (error) {
logger.error("[ProxyStatusTracker] Failed to refresh status snapshot", { error: ... });
// Serve stale data if available; only throw when there is nothing to serve
if (this.statusSnapshotCache) {
return this.statusSnapshotCache;
}
throw error;
}
})().finally(() => {
this.statusSnapshotInFlight = null;
});
The These partially-terminal records won't appear as "active" in Consider adding a separate maintenance step or widening the sweeper condition to also handle records where -- Seal records where response arrived but duration wasn't persisted
UPDATE message_request
SET duration_ms = LEAST(2147483647, GREATEST(0, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000)::int)),
updated_at = NOW()
WHERE deleted_at IS NULL
AND duration_ms IS NULL
AND status_code IS NOT NULL
AND created_at < ${threshold}
AND (blocked_by IS NULL OR blocked_by <> 'warmup')
A value like Consider emitting a throttled warning here similar to the existing clamp-log patterns: if (parsed < 0 || parsed >= 1_000_000) {
// throttled warn – same pattern as sanitizeInt32 / sanitizeSafeInt clamp warnings
logger.warn("[MessageRequestWriteBuffer] costUsd out of accepted range, skipping", {
value: trimmed.slice(0, 32),
min: 0,
max: 1_000_000,
});
return undefined;
}
const sealedCount = Array.isArray(result) ? result.length : Array.from(result).length;For postgres.js (likely the production driver) However, if the driver is ever switched to node-postgres ( The const rows = Array.isArray(result) ? result : ((result as any).rows ?? Array.from(result));
const sealedCount = rows.length; |
- ProxyStatusTracker: 刷新失败时返回过期缓存并延长 TTL,避免 dashboard 轮询错误风暴 - sealOrphanedMessageRequests: 覆盖 status_code 已写入但 duration_ms 缺失的记录,并兼容 rowCount 返回 - message-write-buffer: costUsd 超范围节流告警 - 补充单测覆盖
Additional Comments (4)
When a request has A safer fallback uses duration_ms = CASE
WHEN status_code IS NULL THEN (
LEAST(
2147483647,
GREATEST(0, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000))
)::int
)
ELSE (
LEAST(
2147483647,
GREATEST(0, COALESCE(ttfb_ms, (EXTRACT(EPOCH FROM (NOW() - created_at)) * 1000)))
)::int
)
END,This uses
When Consider a simple doubling strategy, for example: // Track consecutive failures to implement exponential backoff
private statusSnapshotRefreshFailures = 0;
// In the catch block:
const backoffMs = Math.min(
PROXY_STATUS_SNAPSHOT_TTL_MS * Math.pow(2, this.statusSnapshotRefreshFailures),
60_000
);
this.statusSnapshotRefreshFailures++;
this.statusSnapshotCache = {
...this.statusSnapshotCache,
expiresAt: Date.now() + backoffMs,
};
// On successful refresh, reset:
this.statusSnapshotRefreshFailures = 0;This bounds the retry rate without completely blocking status updates when the DB recovers.
When WHEN <id> THEN $n -- $n = NULL, no castIf every row in the batch has
The function tries two loops to find a non-terminal candidate to drop before falling back to Concretely, if IDs The problem arises in the second loop fallback: if // After first loop fails to find a non-currentId non-terminal:
if (droppedId === undefined) {
// currentId is the only remaining non-terminal option
const candidatePatch = currentId !== undefined ? this.pending.get(currentId) : undefined;
if (currentId !== undefined && candidatePatch && !isTerminalPatch(candidatePatch)) {
droppedId = currentId;
droppedPatch = candidatePatch;
}
} |
更新(2026-03-03)
背景
Issue #854:Dashboard 日志里 message_request 长期显示“请求中”,Tokens/成本/性能等字段为空或默认值,最终导致进程崩溃重启。
根因分析(可能叠加)
MESSAGE_REQUEST_WRITE_MODE=async会把终态信息先写入内存队列再批量落库;当 DB 写入遇到数据/约束错误或暂态失败时,旧实现容易让队列卡住/丢失尾部更新,导致duration_ms/status_code长期为空。修复
maxPending上限,溢出时优先丢弃非终态 patch,尽量保住 duration/statuslimit,避免异常情况下撑爆内存/响应体针对 Review 的补强
proxy-status的 lastRequest LATERAL 查询新增部分索引迁移(仅包含已完成请求),避免孤儿积累时回溯过深idx_message_request_active_created_at_id(duration_ms IS NULL),避免扫描退化sealOrphanedMessageRequests阈值改为基于created_at,避免updated_at被非终态更新推迟封闭updateMessageRequestDetails对rejected_invalid直接 return,避免重复 sanitize/重复告警::int/::bigint/::boolean与NULL::jsonbcast,降低类型推断 DB 错误风险flushBatchPerItem丢弃 patch 时记录isTerminal并按终态提升日志级别,便于监控“终态丢失”风险tests/helpers/env.ts,避免测试重复与漂移测试
bun run buildbun run lintbun run lint:fixbun run typecheckbun run testCloses #854