Conversation
|
Review Complete Findings: 0 issues ℹ️ Learn more details on Pantheon AI. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughAdds TopRU (resource-usage) collection and reporting across the TopSQL pipeline: new RU data models, 15s-window aggregator, statement-level RU sampling, reporter ingestion/streaming, state toggles, metrics, executor call-site changes, and extensive tests and build updates. Changes
Sequence Diagram(s)sequenceDiagram
participant Exec as Executor
participant Stmts as StatementStats
participant Agg as Aggregator
participant Collector as RU Collector (Reporter)
participant Window as RU WindowAggregator
participant Reporter as RemoteReporter
participant Sink as Report Sink
Exec->>Stmts: OnExecutionBegin(ExecBeginInfo)
Stmts->>Stmts: addRUOnBeginLocked() / set execCtx
Exec->>Stmts: OnExecutionFinished(ExecFinishInfo)
Stmts->>Stmts: addRUOnFinishLocked() / accumulate finishedRUBuffer
Stmts-->>Agg: MergeRUInto() -> RUIncrementMap
Agg->>Collector: CollectRUIncrements(RUIncrementMap)
Collector->>Window: addBatchToBucket()
Note over Window: 15s buckets rotated/compacted -> 60s window
Window-->>Collector: []TopRURecord
Collector->>Reporter: attach RURecords to ReportData
Reporter->>Sink: sendTopRURecords(RURecords) (gated by TopRUEnabled)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pkg/util/topsql/reporter/BUILD.bazel (1)
42-54:⚠️ Potential issue | 🔴 CriticalMissing test file in BUILD.bazel:
reporter_topru_integration_test.goThe file
reporter_topru_integration_test.gowas added in this PR but is not listed in thego_testsrcs. This will cause the test to be excluded from Bazel builds.🔧 Suggested fix
srcs = [ "datamodel_test.go", "datasink_test.go", "main_test.go", "pubsub_test.go", "reporter_test.go", + "reporter_topru_integration_test.go", "ru_datamodel_test.go", "ru_window_aggregator_test.go", "single_target_test.go", "topru_case_runner_test.go", "topru_generated_cases_test.go", "topru_structured_test.go", ],As per coding guidelines: "MUST run
make bazel_prepareand include resulting Bazel metadata changes in the PR when adding/moving/renaming/removing Go files"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/BUILD.bazel` around lines 42 - 54, The BUILD.bazel srcs list for the go_test target in pkg/util/topsql/reporter is missing the newly added test file reporter_topru_integration_test.go; add "reporter_topru_integration_test.go" to the srcs array (alongside files like "reporter_test.go", "topru_structured_test.go") and re-run make bazel_prepare so the generated Bazel metadata is included in the PR.pkg/executor/adapter.go (1)
2231-2238:⚠️ Potential issue | 🟠 MajorUse “begin ran” state, not the finish-time global, to decide whether to finish.
This change now calls
OnExecutionBeginin the!topProfilingbranch for non-fast plans, while fast plans can still skip begin entirely.observeStmtFinishedForTopProfilingstill keysOnExecutionFinishedoff the current global switch, so you can end up with begin/no-finish when profiling stays off, or finish/no-begin if the switch flips on during a fast statement. Persist a per-statement flag/snapshot when begin bookkeeping runs and use that in the finish path.Also applies to: 2256-2267, 2325-2342
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/executor/adapter.go` around lines 2231 - 2238, The current logic uses the global topsqlstate.TopProfilingEnabled() at finish time, causing mismatches between OnExecutionBegin and OnExecutionFinished; modify the code so that when you call OnExecutionBegin (e.g., in the branch where !topProfiling && !IsFastPlan(a.Plan) or its refactored equivalent) you store a per-statement boolean snapshot (e.g., stmt.topProfilingBegun) indicating that begin bookkeeping ran, and then have observeStmtFinishedForTopProfiling / the OnExecutionFinished path consult that per-statement flag instead of the current global topsqlstate.TopProfilingEnabled(); ensure IsFastPlan logic still avoids calling OnExecutionBegin for fast plans but that the finish path uses the stored flag to decide whether to call OnExecutionFinished to avoid begin/no-finish or finish/no-begin races.
🧹 Nitpick comments (7)
pkg/util/topsql/topsql_test.go (1)
411-415: Redundant conditional check.Line 402 already asserts
require.True(t, topsqlstate.TopProfilingEnabled()). If that passes, this condition is always true; if it fails, the test exits. Theifblock can be removed, executing the registration calls unconditionally.♻️ Proposed fix
- if topsqlstate.TopProfilingEnabled() { - topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false) - topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) - topsql.RegisterPlan(plan, planDigest) - } + topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false) + topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) + topsql.RegisterPlan(plan, planDigest)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/topsql_test.go` around lines 411 - 415, The conditional guard checking topsqlstate.TopProfilingEnabled() is redundant because the test already asserts require.True(t, topsqlstate.TopProfilingEnabled()); remove the if (...) { ... } wrapper and invoke topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false), topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) and topsql.RegisterPlan(plan, planDigest) unconditionally so the calls always run when the assertion passes.pkg/util/topsql/reporter/reporter_test.go (1)
241-248: Make the internal-SQL fixture explicit instead of encoding it insqlIDparity.
sqlID%2 == 0is now a hidden convention for every caller ofnewSQLCPUTimeRecord. Renumbering a fixture in an unrelated test now silently changesIsInternalSql, which makes the test data less local and harder to reason about. PassingisInternalexplicitly, or using a dedicated helper for internal cases, would keep these tests clearer and more stable.Based on learnings: Keep test changes minimal and deterministic; avoid broad golden/testdata churn unless required.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/reporter_test.go` around lines 241 - 248, The helper newSQLCPUTimeRecord currently encodes IsInternalSql via sqlID%2==0 which hides intent and couples tests to fixture numbering; change newSQLCPUTimeRecord to accept an explicit isInternal bool (or add a separate newInternalSQLCPUTimeRecord wrapper) and pass that through to tsr.RegisterSQL instead of using sqlID parity, updating callers to explicitly specify whether the SQL is internal so tests remain local and deterministic (refer to newSQLCPUTimeRecord and its use of tsr.RegisterSQL).pkg/util/topsql/stmtstats/aggregator.go (1)
22-23: Avoid pulling reporter-layer metrics intostmtstats.
RUCollectoris introduced here to decouple the producer from the sink, but importingpkg/util/topsql/reporter/metricsreverses that dependency again. Moving these counters into a shared topsql metrics package, or surfacing drop accounting through the collector path, would keepstmtstatsindependent of the reporter layer.Also applies to: 135-143
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/stmtstats/aggregator.go` around lines 22 - 23, The stmtstats package currently imports reporter-layer metrics (reporter_metrics) to update drop counters, which reintroduces a dependency; remove the reporter_metrics import and instead either (A) move the counters into a shared topsql metrics package and import that shared package from both stmtstats and reporter, or (B) extend the RUCollector interface/implementation to expose methods for accounting drops (e.g., AddDropCount, IncDroppedRU/IncDroppedStmt) and call those from stmtstats so stmtstats only depends on RUCollector (update any sites that currently reference the reporter_metrics counters around the RUCollector usage, including the block that updates counters in the 135-143 region). Ensure all metrics updates occur via the shared package or collector methods and delete direct references to reporter_metrics from stmtstats.pkg/util/topsql/reporter/reporter_topru_integration_test.go (1)
60-61: Worker goroutines started without synchronization barrier.The
collectRUWorkerandreportWorkerare started but there's no guarantee they're fully initialized beforeCollectRUIncrementsis called. Whilerequire.Eventuallyat line 75 mitigates this, consider adding a brief startup barrier or comment explaining the timing assumption.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/reporter_topru_integration_test.go` around lines 60 - 61, The test starts the goroutines tsr.collectRUWorker() and tsr.reportWorker() without ensuring they are initialized before calling tsr.CollectRUIncrements; add a small startup barrier to guarantee readiness (e.g., a sync.WaitGroup or a ready channel that each worker signals when initialized) and wait for that signal in the test before invoking CollectRUIncrements, or at minimum add a comment above the goroutine starts explaining that require.Eventually is relied on to handle startup timing; reference the tsr.collectRUWorker, tsr.reportWorker and tsr.CollectRUIncrements symbols when making the change.pkg/util/topsql/reporter/datasink_test.go (1)
44-56: Consider extracting the repeated state reset logic into a helper.The same pattern of resetting TopRU/TopSQL state appears in every test (lines 44-56, 84-96, 115-127, 170-182). Extracting this into a helper function would reduce duplication and improve maintainability.
♻️ Suggested helper
func resetTopRUState(t *testing.T) { for topsqlstate.TopRUEnabled() { topsqlstate.DisableTopRU() } topsqlstate.DisableTopSQL() topsqlstate.ResetTopRUItemInterval() t.Cleanup(func() { for topsqlstate.TopRUEnabled() { topsqlstate.DisableTopRU() } topsqlstate.DisableTopSQL() topsqlstate.ResetTopRUItemInterval() }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/datasink_test.go` around lines 44 - 56, Extract the repeated TopRU/TopSQL teardown code into a helper named resetTopRUState(t *testing.T) that performs the loop disabling TopRU, calls topsqlstate.DisableTopSQL(), and topsqlstate.ResetTopRUItemInterval(), and registers the same cleanup via t.Cleanup; then replace the duplicated blocks in TestDefaultDataSinkRegistererTopRUTwoSinksRefCountAndReset and the other tests with a single call to resetTopRUState(t) to remove duplication and centralize state reset logic.pkg/util/topsql/reporter/datasink.go (1)
98-101: Item interval overwrite on each registration may use a suboptimal value.When multiple sinks register with different
itemIntervalvalues, the last registration wins. Consider tracking the minimum interval across all registered sinks to ensure the finest granularity is used, or document that the current behavior is intentional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/datasink.go` around lines 98 - 101, The current registration loop overwrites topsqlstate's item interval each time a pubSubDataSink registers (dataSink cast to *pubSubDataSink) so the last sink's itemInterval wins; change this to track the minimum itemInterval across all registered pubSubDataSink instances and call topsqlstate.SetTopRUItemInterval with that minimum (still call topsqlstate.EnableTopRU when any ds.enableTopRU is true), or explicitly document that overwriting is intentional; specifically, when iterating sinks look up pubSubDataSink.itemInterval and take math.Min(currentMin, itemInterval) before calling topsqlstate.SetTopRUItemInterval.pkg/util/topsql/reporter/ru_datamodel.go (1)
124-133: Consider a map for O(1) timestamp lookup in high-frequency paths.The linear scan over
r.itemsfor matching timestamps is O(n) peradd()call. If items grow large (many timestamps per record), this could become a bottleneck. Since timestamps are unique keys, amap[uint64]*ruItemwould provide O(1) lookups.For now this is likely acceptable given the bounded bucket interval (15s windows), but worth noting for future optimization.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/ru_datamodel.go` around lines 124 - 133, The current ruRecord.add method does an O(n) linear scan over r.items to find a matching timestamp; change r.items to be paired with a map for O(1) lookup by timestamp (e.g., add a field like timestampIndex map[uint64]*ruItem on ruRecord). In ruRecord.add, first try a lookup via r.timestampIndex[timestamp]; if found, update that ruItem's totalRU, execCount, execDuration and r.totalRU; if not found, create a new ruItem, append it to r.items, insert it into r.timestampIndex, and update r.totalRU. Ensure any other code that modifies r.items (removals/rotations) keeps timestampIndex in sync to avoid stale entries.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/datasink_test.go`:
- Around line 146-150: The test spawns goroutines that call r.Register(ds) and
currently call t.Errorf from those goroutines; move error reporting out of
goroutines by creating a buffered error channel (e.g. errCh := make(chan error,
loops)) or a synchronized error collector and have each goroutine send any error
to that channel instead of calling t.Errorf; after the goroutines finish (use a
sync.WaitGroup or similar), close the channel and assert/fail from the main test
goroutine (e.g. check channel contents and call t.Fatalf or require.NoError for
each error) so r.Register, ds and loops remain the same but all test failures
are reported from the main test goroutine.
In `@pkg/util/topsql/reporter/datasink.go`:
- Around line 98-101: The code enabling TopRU in pubSubDataSink uses
topsqlstate.EnableTopRU()/SetTopRUItemInterval but lacks refcounting when sinks
deregister; add a global atomic counter (e.g., topRUSinkCount) similar to
topSQLSinkCount, increment it when a sink with enableTopRU registers (in the
pubSubDataSink registration path where EnableTopRU is called) and decrement it
when such a sink deregisters, and only call topsqlstate.DisableTopRU() when the
counter reaches zero; use atomic operations to update the counter and ensure
SetTopRUItemInterval is set on first enable.
In `@pkg/util/topsql/reporter/reporter_test.go`:
- Around line 204-215: The test only saves GlobalState.ReportIntervalSeconds but
fails to restore the original TopSQL and TopRU enabled states and the original
TopRU item interval; modify the setup to snapshot the current states (call
topsqlstate.TopSQLEnabled() or equivalent to capture whether TopSQL was enabled,
call topsqlstate.TopRUEnabled() to capture TopRU state, and capture the TopRU
item interval via the existing ResetTopRUItemInterval accessor or its Load
method), then in the t.Cleanup restore the ReportIntervalSeconds (using
GlobalState.ReportIntervalSeconds.Store), re-enable/disable TopSQL and TopRU to
their original states using
topsqlstate.DisableTopSQL()/topsqlstate.EnableTopSQL() and
topsqlstate.DisableTopRU()/topsqlstate.EnableTopRU() as appropriate, and restore
the TopRU item interval rather than always resetting it; reference
topsqlstate.DisableTopSQL, topsqlstate.TopRUEnabled, topsqlstate.DisableTopRU,
topsqlstate.GlobalState.ReportIntervalSeconds.Store, and
topsqlstate.ResetTopRUItemInterval when making changes.
In `@pkg/util/topsql/reporter/reporter.go`:
- Around line 241-250: collectRUWorker currently timestamps RU batches when
dequeued, causing races with takeReportRecords which closes RU windows on
another goroutine; preserve enqueue ordering by carrying the original enqueue
timestamp through the channel (attach timestamp to items sent to
collectRUIncrementsChan) or by ensuring flushing of any pending items before
takeReportRecords runs. Modify the code paths around collectRUWorker,
collectRUIncrementsChan, ruAggregator.addBatchToBucket and takeReportRecords so
that either (a) messages on collectRUIncrementsChan include the enqueue
timestamp and that timestamp is used when calling ruAggregator.addBatchToBucket,
or (b) add a drain step in the goroutine that calls takeReportRecords to consume
and apply all pending collectRUIncrementsChan items to ruAggregator (or run RU
ingest/reporting on the same goroutine) before closing windows, ensuring no
batches cross window boundaries and eliminating late/drop races.
In `@pkg/util/topsql/reporter/ru_window_aggregator_test.go`:
- Around line 205-208: Remove the unused accumulator totalDropped and simply
drain the done channel without assigning its values: delete the declaration of
totalDropped and change the loop that currently does totalDropped += <-done to
just <-done so the test consumes results from done (referencing the numWriters
loop and done channel in ru_window_aggregator_test.go).
- Around line 73-80: The test currently sets numUsers and numSQLsPerUser to 200
(in the test using makeRUBatch and agg.addBatchToBucket calls), producing
200*200=40,000 keys rather than the intended 10,000; either reduce numUsers
and/or numSQLsPerUser to 100 (e.g., set numUsers, numSQLsPerUser = 100, 100) so
makeRUBatch generates 10k keys for
BenchmarkTakeReportRecords_Large_60s_At10kKeys, or update the comment/benchmark
name to accurately reflect 40k keys if the larger load is intended. Ensure
changes reference the constants numUsers and numSQLsPerUser used when
constructing batch via makeRUBatch.
In `@pkg/util/topsql/reporter/ru_window_aggregator.go`:
- Around line 163-177: The function buildReportRecords doesn't validate
itemInterval against ruBaseBucketSeconds (15) so if itemInterval <
ruBaseBucketSeconds then bucketsPerInterval computes to 0 and downstream caps
become zero; inside buildReportRecords validate itemInterval (the parameter) to
ensure it's one of allowed values (15, 30, 60) or at minimum set itemInterval =
ruBaseBucketSeconds when smaller, and return an error or fallback before
computing bucketsPerInterval; update the logic that computes bucketsPerInterval,
intervalPreCapUsers, intervalPreCapSQLsPerUser and the creation of mergedOutput
(newRUCollectingWithCaps) to use the validated/defaulted itemInterval so caps
are never zero.
In `@pkg/util/topsql/reporter/single_target_test.go`:
- Around line 135-137: The test mutates global config.TopSQL.ReceiverAddress via
config.UpdateGlobal and doesn't restore it, causing later tests to see a dead
mock address; capture the original value before calling config.UpdateGlobal and
register a t.Cleanup that calls config.UpdateGlobal to reset
conf.TopSQL.ReceiverAddress back to the saved original (use the same
config.UpdateGlobal function), ensuring the mock server.Address() set in this
test is reverted when the test finishes.
In `@pkg/util/topsql/reporter/topru_generated_cases_test.go`:
- Around line 1-5: The generated test file topru_generated_cases_test.go is
missing the standard TiDB license header; update the generator script
ai/projects/topru-ai/testgen/generate_topru_cases.py to prepend the canonical
TiDB copyright + Apache-2.0 header to all generated .go files (use an existing
nearby file as the source template and update the year if needed) so
topru_generated_cases_test.go and any future outputs include the required
license block.
In `@pkg/util/topsql/state/BUILD.bazel`:
- Line 21: Remove the flaky = True attribute from the BUILD.bazel target (the
line "flaky = True") and investigate the root cause of the intermittent failures
in the pkg/util/topsql/state tests: run the test target repeatedly and under
race/stress settings locally and in CI, inspect the state package tests for race
conditions or shared/global state, add proper isolation/setup/teardown or
synchronization where needed, and if flakiness is a known, tracked issue leave a
brief comment in BUILD.bazel referencing the issue number instead of keeping
flaky = True.
In `@pkg/util/topsql/state/state_test.go`:
- Around line 23-48: This test mutates process-global state
(GlobalState.ruConsumerCount and the TopRU item interval) and doesn't restore
it, so wrap a snapshot-and-restore in t.Cleanup: before mutating, capture
current GlobalState.ruConsumerCount.Load() and current GetTopRUItemInterval(),
then call t.Cleanup to restore the count (via
GlobalState.ruConsumerCount.Store(saved)) and reset the interval (via
SetTopRUItemInterval(saved) or ResetTopRUItemInterval() as appropriate); ensure
this pattern is applied to TestTopRUEnableDisableAndResetInterval and the other
tests referenced (lines 50-63, 65-83) and use the existing helpers
ResetTopRUItemInterval, SetTopRUItemInterval, GetTopRUItemInterval, EnableTopRU,
DisableTopRU, and TopRUEnabled to restore invariant state.
In `@pkg/util/topsql/state/state.go`:
- Around line 94-116: The decrement in DisableTopRU() can race with concurrent
EnableTopRU() + SetTopRUItemInterval(), letting ResetTopRUItemInterval()
overwrite a new subscriber's interval; serialize the refcount+interval update by
protecting the trio of operations with a single lock or atomic composite update:
add a mutex (e.g., GlobalState.topRUMu) and wrap EnableTopRU, DisableTopRU,
SetTopRUItemInterval and ResetTopRUItemInterval (or instead replace separate
atomics with a single CAS on a struct containing {count, interval}) so the
transition from 1->0 and any concurrent 0->1+set are performed as one
atomic/locked critical section and cannot interleave while calling
ResetTopRUItemInterval; update callers in state.go lines around
EnableTopRU/DisableTopRU and the other affected block (137-156) to use the same
protection.
---
Outside diff comments:
In `@pkg/executor/adapter.go`:
- Around line 2231-2238: The current logic uses the global
topsqlstate.TopProfilingEnabled() at finish time, causing mismatches between
OnExecutionBegin and OnExecutionFinished; modify the code so that when you call
OnExecutionBegin (e.g., in the branch where !topProfiling && !IsFastPlan(a.Plan)
or its refactored equivalent) you store a per-statement boolean snapshot (e.g.,
stmt.topProfilingBegun) indicating that begin bookkeeping ran, and then have
observeStmtFinishedForTopProfiling / the OnExecutionFinished path consult that
per-statement flag instead of the current global
topsqlstate.TopProfilingEnabled(); ensure IsFastPlan logic still avoids calling
OnExecutionBegin for fast plans but that the finish path uses the stored flag to
decide whether to call OnExecutionFinished to avoid begin/no-finish or
finish/no-begin races.
In `@pkg/util/topsql/reporter/BUILD.bazel`:
- Around line 42-54: The BUILD.bazel srcs list for the go_test target in
pkg/util/topsql/reporter is missing the newly added test file
reporter_topru_integration_test.go; add "reporter_topru_integration_test.go" to
the srcs array (alongside files like "reporter_test.go",
"topru_structured_test.go") and re-run make bazel_prepare so the generated Bazel
metadata is included in the PR.
---
Nitpick comments:
In `@pkg/util/topsql/reporter/datasink_test.go`:
- Around line 44-56: Extract the repeated TopRU/TopSQL teardown code into a
helper named resetTopRUState(t *testing.T) that performs the loop disabling
TopRU, calls topsqlstate.DisableTopSQL(), and
topsqlstate.ResetTopRUItemInterval(), and registers the same cleanup via
t.Cleanup; then replace the duplicated blocks in
TestDefaultDataSinkRegistererTopRUTwoSinksRefCountAndReset and the other tests
with a single call to resetTopRUState(t) to remove duplication and centralize
state reset logic.
In `@pkg/util/topsql/reporter/datasink.go`:
- Around line 98-101: The current registration loop overwrites topsqlstate's
item interval each time a pubSubDataSink registers (dataSink cast to
*pubSubDataSink) so the last sink's itemInterval wins; change this to track the
minimum itemInterval across all registered pubSubDataSink instances and call
topsqlstate.SetTopRUItemInterval with that minimum (still call
topsqlstate.EnableTopRU when any ds.enableTopRU is true), or explicitly document
that overwriting is intentional; specifically, when iterating sinks look up
pubSubDataSink.itemInterval and take math.Min(currentMin, itemInterval) before
calling topsqlstate.SetTopRUItemInterval.
In `@pkg/util/topsql/reporter/reporter_test.go`:
- Around line 241-248: The helper newSQLCPUTimeRecord currently encodes
IsInternalSql via sqlID%2==0 which hides intent and couples tests to fixture
numbering; change newSQLCPUTimeRecord to accept an explicit isInternal bool (or
add a separate newInternalSQLCPUTimeRecord wrapper) and pass that through to
tsr.RegisterSQL instead of using sqlID parity, updating callers to explicitly
specify whether the SQL is internal so tests remain local and deterministic
(refer to newSQLCPUTimeRecord and its use of tsr.RegisterSQL).
In `@pkg/util/topsql/reporter/reporter_topru_integration_test.go`:
- Around line 60-61: The test starts the goroutines tsr.collectRUWorker() and
tsr.reportWorker() without ensuring they are initialized before calling
tsr.CollectRUIncrements; add a small startup barrier to guarantee readiness
(e.g., a sync.WaitGroup or a ready channel that each worker signals when
initialized) and wait for that signal in the test before invoking
CollectRUIncrements, or at minimum add a comment above the goroutine starts
explaining that require.Eventually is relied on to handle startup timing;
reference the tsr.collectRUWorker, tsr.reportWorker and tsr.CollectRUIncrements
symbols when making the change.
In `@pkg/util/topsql/reporter/ru_datamodel.go`:
- Around line 124-133: The current ruRecord.add method does an O(n) linear scan
over r.items to find a matching timestamp; change r.items to be paired with a
map for O(1) lookup by timestamp (e.g., add a field like timestampIndex
map[uint64]*ruItem on ruRecord). In ruRecord.add, first try a lookup via
r.timestampIndex[timestamp]; if found, update that ruItem's totalRU, execCount,
execDuration and r.totalRU; if not found, create a new ruItem, append it to
r.items, insert it into r.timestampIndex, and update r.totalRU. Ensure any other
code that modifies r.items (removals/rotations) keeps timestampIndex in sync to
avoid stale entries.
In `@pkg/util/topsql/stmtstats/aggregator.go`:
- Around line 22-23: The stmtstats package currently imports reporter-layer
metrics (reporter_metrics) to update drop counters, which reintroduces a
dependency; remove the reporter_metrics import and instead either (A) move the
counters into a shared topsql metrics package and import that shared package
from both stmtstats and reporter, or (B) extend the RUCollector
interface/implementation to expose methods for accounting drops (e.g.,
AddDropCount, IncDroppedRU/IncDroppedStmt) and call those from stmtstats so
stmtstats only depends on RUCollector (update any sites that currently reference
the reporter_metrics counters around the RUCollector usage, including the block
that updates counters in the 135-143 region). Ensure all metrics updates occur
via the shared package or collector methods and delete direct references to
reporter_metrics from stmtstats.
In `@pkg/util/topsql/topsql_test.go`:
- Around line 411-415: The conditional guard checking
topsqlstate.TopProfilingEnabled() is redundant because the test already asserts
require.True(t, topsqlstate.TopProfilingEnabled()); remove the if (...) { ... }
wrapper and invoke topsql.AttachAndRegisterSQLInfo(ctx, sql, sqlDigest, false),
topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest) and
topsql.RegisterPlan(plan, planDigest) unconditionally so the calls always run
when the assertion passes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9e36f95f-f291-46cb-80c1-d5e6596d1f02
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (37)
DEPS.bzlgo.modpkg/executor/BUILD.bazelpkg/executor/adapter.gopkg/executor/adapter_internal_test.gopkg/util/topsql/BUILD.bazelpkg/util/topsql/reporter/BUILD.bazelpkg/util/topsql/reporter/datasink.gopkg/util/topsql/reporter/datasink_test.gopkg/util/topsql/reporter/metrics/metrics.gopkg/util/topsql/reporter/mock/server.gopkg/util/topsql/reporter/pubsub.gopkg/util/topsql/reporter/pubsub_test.gopkg/util/topsql/reporter/reporter.gopkg/util/topsql/reporter/reporter_test.gopkg/util/topsql/reporter/reporter_topru_integration_test.gopkg/util/topsql/reporter/ru_datamodel.gopkg/util/topsql/reporter/ru_datamodel_test.gopkg/util/topsql/reporter/ru_window_aggregator.gopkg/util/topsql/reporter/ru_window_aggregator_test.gopkg/util/topsql/reporter/single_target.gopkg/util/topsql/reporter/single_target_test.gopkg/util/topsql/reporter/topru_case_runner_test.gopkg/util/topsql/reporter/topru_generated_cases_test.gopkg/util/topsql/reporter/topru_structured_test.gopkg/util/topsql/state/BUILD.bazelpkg/util/topsql/state/state.gopkg/util/topsql/state/state_test.gopkg/util/topsql/stmtstats/BUILD.bazelpkg/util/topsql/stmtstats/aggregator.gopkg/util/topsql/stmtstats/aggregator_bench_test.gopkg/util/topsql/stmtstats/aggregator_test.gopkg/util/topsql/stmtstats/rustats.gopkg/util/topsql/stmtstats/stmtstats.gopkg/util/topsql/stmtstats/stmtstats_test.gopkg/util/topsql/topsql.gopkg/util/topsql/topsql_test.go
| for i := 0; i < loops; i++ { | ||
| if err := r.Register(ds); err != nil { | ||
| t.Errorf("register failed: %v", err) | ||
| return | ||
| } |
There was a problem hiding this comment.
Avoid t.Errorf inside goroutines; use a shared error channel or t.Fatal guard.
Calling t.Errorf from a goroutine other than the test goroutine can lead to races and may not properly fail the test if the main goroutine exits before the error is recorded. Consider capturing errors in a channel or using require with proper synchronization.
🔧 Suggested fix
+ errCh := make(chan error, goroutines*loops)
for g := 0; g < goroutines; g++ {
sink := sinks[g%len(sinks)]
go func(ds *pubSubDataSink) {
defer wg.Done()
for i := 0; i < loops; i++ {
if err := r.Register(ds); err != nil {
- t.Errorf("register failed: %v", err)
- return
+ errCh <- err
+ continue
}
r.Deregister(ds)
}
}(sink)
}
wg.Wait()
+ close(errCh)
+ for err := range errCh {
+ require.NoError(t, err, "register failed during concurrent test")
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/datasink_test.go` around lines 146 - 150, The test
spawns goroutines that call r.Register(ds) and currently call t.Errorf from
those goroutines; move error reporting out of goroutines by creating a buffered
error channel (e.g. errCh := make(chan error, loops)) or a synchronized error
collector and have each goroutine send any error to that channel instead of
calling t.Errorf; after the goroutines finish (use a sync.WaitGroup or similar),
close the channel and assert/fail from the main test goroutine (e.g. check
channel contents and call t.Fatalf or require.NoError for each error) so
r.Register, ds and loops remain the same but all test failures are reported from
the main test goroutine.
| if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU { | ||
| topsqlstate.EnableTopRU() | ||
| topsqlstate.SetTopRUItemInterval(ds.itemInterval) | ||
| } |
There was a problem hiding this comment.
TopRU lacks refcounting while TopSQL uses it—potential early-disable bug.
TopSQL uses topSQLSinkCount to ensure it's only disabled when the last sink deregisters. However, TopRU calls DisableTopRU() immediately when any sink with enableTopRU deregisters (line 137), which could prematurely disable TopRU if multiple sinks have it enabled.
If two sinks both enable TopRU, deregistering the first will disable TopRU globally while the second sink still expects it.
🔧 Suggested fix: Add refcounting for TopRU
type DefaultDataSinkRegisterer struct {
ctx context.Context
dataSinks map[DataSink]struct{}
// topSQLSinkCount tracks the number of sinks that require TopSQL enabled.
topSQLSinkCount int
+ // topRUSinkCount tracks the number of sinks that require TopRU enabled.
+ topRUSinkCount int
sync.Mutex
}
// In Register:
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
topsqlstate.EnableTopRU()
topsqlstate.SetTopRUItemInterval(ds.itemInterval)
+ r.topRUSinkCount++
}
// In Deregister:
if ds, ok := dataSink.(*pubSubDataSink); ok && ds.enableTopRU {
- topsqlstate.DisableTopRU()
+ if r.topRUSinkCount > 0 {
+ r.topRUSinkCount--
+ }
+ if r.topRUSinkCount == 0 {
+ topsqlstate.DisableTopRU()
+ }
}Also applies to: 136-138
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/datasink.go` around lines 98 - 101, The code
enabling TopRU in pubSubDataSink uses
topsqlstate.EnableTopRU()/SetTopRUItemInterval but lacks refcounting when sinks
deregister; add a global atomic counter (e.g., topRUSinkCount) similar to
topSQLSinkCount, increment it when a sink with enableTopRU registers (in the
pubSubDataSink registration path where EnableTopRU is called) and decrement it
when such a sink deregisters, and only call topsqlstate.DisableTopRU() when the
counter reaches zero; use atomic operations to update the counter and ensure
SetTopRUItemInterval is set on first enable.
| topsqlstate.DisableTopSQL() | ||
| for topsqlstate.TopRUEnabled() { | ||
| topsqlstate.DisableTopRU() | ||
| } | ||
| origTopSQLInterval := topsqlstate.GlobalState.ReportIntervalSeconds.Load() | ||
| t.Cleanup(func() { | ||
| topsqlstate.GlobalState.ReportIntervalSeconds.Store(origTopSQLInterval) | ||
| for topsqlstate.TopRUEnabled() { | ||
| topsqlstate.DisableTopRU() | ||
| } | ||
| topsqlstate.ResetTopRUItemInterval() | ||
| }) |
There was a problem hiding this comment.
Restore the original TopSQL/TopRU state, not just the report interval.
Line 204 disables TopSQL, and the cleanup at Lines 209-215 force-disables TopRU and resets its interval to defaults, but the test only snapshots ReportIntervalSeconds. That leaks global state into later tests if either feature was already enabled or TopRU had a non-default interval, which makes this package order-dependent.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/reporter_test.go` around lines 204 - 215, The test
only saves GlobalState.ReportIntervalSeconds but fails to restore the original
TopSQL and TopRU enabled states and the original TopRU item interval; modify the
setup to snapshot the current states (call topsqlstate.TopSQLEnabled() or
equivalent to capture whether TopSQL was enabled, call
topsqlstate.TopRUEnabled() to capture TopRU state, and capture the TopRU item
interval via the existing ResetTopRUItemInterval accessor or its Load method),
then in the t.Cleanup restore the ReportIntervalSeconds (using
GlobalState.ReportIntervalSeconds.Store), re-enable/disable TopSQL and TopRU to
their original states using
topsqlstate.DisableTopSQL()/topsqlstate.EnableTopSQL() and
topsqlstate.DisableTopRU()/topsqlstate.EnableTopRU() as appropriate, and restore
the TopRU item interval rather than always resetting it; reference
topsqlstate.DisableTopSQL, topsqlstate.TopRUEnabled, topsqlstate.DisableTopRU,
topsqlstate.GlobalState.ReportIntervalSeconds.Store, and
topsqlstate.ResetTopRUItemInterval when making changes.
| func (tsr *RemoteTopSQLReporter) collectRUWorker() { | ||
| defer util.Recover("top-sql", "collectRUWorker", nil, false) | ||
|
|
||
| for { | ||
| select { | ||
| case <-tsr.ctx.Done(): | ||
| return | ||
| case data := <-tsr.collectRUIncrementsChan: | ||
| timestamp := uint64(nowFunc().Unix()) | ||
| tsr.ruAggregator.addBatchToBucket(timestamp, data) |
There was a problem hiding this comment.
Drain pending RU batches before closing a window.
collectRUWorker timestamps RU maps when it dequeues them, but takeReportRecords closes RU windows on a different goroutine. If a batch sits in collectRUIncrementsChan across a 60s boundary, the old window can be emitted first and that batch is then shifted into the next window or dropped as late data. This path needs a single ordering point: preserve the enqueue timestamp and flush pending RU batches before calling takeReportRecords, or keep RU ingest/reporting on the same goroutine.
Also applies to: 345-350
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/reporter.go` around lines 241 - 250, collectRUWorker
currently timestamps RU batches when dequeued, causing races with
takeReportRecords which closes RU windows on another goroutine; preserve enqueue
ordering by carrying the original enqueue timestamp through the channel (attach
timestamp to items sent to collectRUIncrementsChan) or by ensuring flushing of
any pending items before takeReportRecords runs. Modify the code paths around
collectRUWorker, collectRUIncrementsChan, ruAggregator.addBatchToBucket and
takeReportRecords so that either (a) messages on collectRUIncrementsChan include
the enqueue timestamp and that timestamp is used when calling
ruAggregator.addBatchToBucket, or (b) add a drain step in the goroutine that
calls takeReportRecords to consume and apply all pending collectRUIncrementsChan
items to ruAggregator (or run RU ingest/reporting on the same goroutine) before
closing windows, ensuring no batches cross window boundaries and eliminating
late/drop races.
| const numUsers, numSQLsPerUser = 200, 200 // 10k keys | ||
| batch := makeRUBatch(numUsers, numSQLsPerUser) | ||
| agg.addBatchToBucket(1, batch) | ||
| agg.addBatchToBucket(16, batch) | ||
| agg.addBatchToBucket(31, batch) | ||
| agg.addBatchToBucket(46, batch) | ||
| agg.addBatchToBucket(61, batch) // triggers rotation; 4 buckets compact to 200×200 (with ≤100×100 data); bucket 60 has 10k keys | ||
| } |
There was a problem hiding this comment.
The “10k keys” helper is currently generating 40k keys.
200 * 200 yields 40,000 entries, so BenchmarkTakeReportRecords_Large_60s_At10kKeys is benchmarking 4× the advertised load and the comparison in the comment becomes misleading.
🛠️ Proposed fix
-func fillAggregatorSteadyState60sAt10kKeys(agg *ruWindowAggregator) {
- const numUsers, numSQLsPerUser = 200, 200 // 10k keys
+func fillAggregatorSteadyState60sAt10kKeys(agg *ruWindowAggregator) {
+ const numUsers, numSQLsPerUser = 100, 100 // 10k keys📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const numUsers, numSQLsPerUser = 200, 200 // 10k keys | |
| batch := makeRUBatch(numUsers, numSQLsPerUser) | |
| agg.addBatchToBucket(1, batch) | |
| agg.addBatchToBucket(16, batch) | |
| agg.addBatchToBucket(31, batch) | |
| agg.addBatchToBucket(46, batch) | |
| agg.addBatchToBucket(61, batch) // triggers rotation; 4 buckets compact to 200×200 (with ≤100×100 data); bucket 60 has 10k keys | |
| } | |
| func fillAggregatorSteadyState60sAt10kKeys(agg *ruWindowAggregator) { | |
| const numUsers, numSQLsPerUser = 100, 100 // 10k keys | |
| batch := makeRUBatch(numUsers, numSQLsPerUser) | |
| agg.addBatchToBucket(1, batch) | |
| agg.addBatchToBucket(16, batch) | |
| agg.addBatchToBucket(31, batch) | |
| agg.addBatchToBucket(46, batch) | |
| agg.addBatchToBucket(61, batch) // triggers rotation; 4 buckets compact to 200×200 (with ≤100×100 data); bucket 60 has 10k keys | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/ru_window_aggregator_test.go` around lines 73 - 80,
The test currently sets numUsers and numSQLsPerUser to 200 (in the test using
makeRUBatch and agg.addBatchToBucket calls), producing 200*200=40,000 keys
rather than the intended 10,000; either reduce numUsers and/or numSQLsPerUser to
100 (e.g., set numUsers, numSQLsPerUser = 100, 100) so makeRUBatch generates 10k
keys for BenchmarkTakeReportRecords_Large_60s_At10kKeys, or update the
comment/benchmark name to accurately reflect 40k keys if the larger load is
intended. Ensure changes reference the constants numUsers and numSQLsPerUser
used when constructing batch via makeRUBatch.
| config.UpdateGlobal(func(conf *config.Config) { | ||
| conf.TopSQL.ReceiverAddress = server.Address() | ||
| }) |
There was a problem hiding this comment.
Restore the receiver address in t.Cleanup.
config.UpdateGlobal() mutates process-wide state, and this test leaves TopSQL.ReceiverAddress pointing at a mock server that is stopped in cleanup. A later reporter test can inherit that dead address and fail depending on execution order.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/single_target_test.go` around lines 135 - 137, The
test mutates global config.TopSQL.ReceiverAddress via config.UpdateGlobal and
doesn't restore it, causing later tests to see a dead mock address; capture the
original value before calling config.UpdateGlobal and register a t.Cleanup that
calls config.UpdateGlobal to reset conf.TopSQL.ReceiverAddress back to the saved
original (use the same config.UpdateGlobal function), ensuring the mock
server.Address() set in this test is reverted when the test finishes.
| // Code generated by ai/projects/topru-ai/testgen/generate_topru_cases.py. DO NOT EDIT. | ||
|
|
||
| package reporter | ||
|
|
||
| import "testing" |
There was a problem hiding this comment.
Generated file missing license header.
Even generated files should include the standard TiDB license header. The generator script should be updated to include it.
As per coding guidelines: "For new source files (for example *.go), include the standard TiDB license header (copyright + Apache 2.0) by copying from a nearby file and updating year if needed"
🧰 Tools
🪛 golangci-lint (2.5.0)
[error] 1-1: : loading compiled Go files from cache: reading srcfiles list: cache entry not found: open /tmp/go/.go-cache/9f/9f7b2cb20653407db1b3a2fc93ab30edde8fbd5d52d47bb4cfe145fd386cb108-a: no such file or directory
(typecheck)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/topru_generated_cases_test.go` around lines 1 - 5,
The generated test file topru_generated_cases_test.go is missing the standard
TiDB license header; update the generator script
ai/projects/topru-ai/testgen/generate_topru_cases.py to prepend the canonical
TiDB copyright + Apache-2.0 header to all generated .go files (use an existing
nearby file as the source template and update the year if needed) so
topru_generated_cases_test.go and any future outputs include the required
license block.
| timeout = "short", | ||
| srcs = ["state_test.go"], | ||
| embed = [":state"], | ||
| flaky = True, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for flaky markers in topsql BUILD files and examine the test file
echo "=== Flaky markers in topsql BUILD files ==="
rg -n "flaky\s*=" pkg/util/topsql/ --include='*.bazel'
echo -e "\n=== Examining state_test.go for potential issues ==="
if [ -f "pkg/util/topsql/state/state_test.go" ]; then
wc -l pkg/util/topsql/state/state_test.go
head -150 pkg/util/topsql/state/state_test.go
fi
echo -e "\n=== Checking BUILD.bazel for context around flaky marker ==="
if [ -f "pkg/util/topsql/state/BUILD.bazel" ]; then
cat -n pkg/util/topsql/state/BUILD.bazel | sed -n '15,25p'
fiRepository: pingcap/tidb
Length of output: 3541
🏁 Script executed:
#!/bin/bash
# Check git history for this file and flaky marker
echo "=== Recent commits touching BUILD.bazel ==="
git log --oneline -10 -- pkg/util/topsql/state/BUILD.bazel
echo -e "\n=== Full git blame for flaky line ==="
git blame -L 21,21 pkg/util/topsql/state/BUILD.bazel
echo -e "\n=== Check commit message for flaky addition ==="
git log -p --all -S "flaky = True" -- pkg/util/topsql/state/BUILD.bazel | head -50
echo -e "\n=== Search for issue references in comments ==="
cat pkg/util/topsql/state/BUILD.bazelRepository: pingcap/tidb
Length of output: 989
🏁 Script executed:
#!/bin/bash
# Check git history for this file and flaky marker
echo "=== Recent commits touching BUILD.bazel ==="
git log --oneline -10 -- pkg/util/topsql/state/BUILD.bazel
echo -e "\n=== Full git blame for flaky line ==="
git blame -L 21,21 pkg/util/topsql/state/BUILD.bazel
echo -e "\n=== Check commit message for flaky addition ==="
git log -p --all -S "flaky = True" -- pkg/util/topsql/state/BUILD.bazel | head -80
echo -e "\n=== Full BUILD.bazel file to check for comments ==="
cat pkg/util/topsql/state/BUILD.bazelRepository: pingcap/tidb
Length of output: 993
Remove flaky = True and investigate the root cause of test instability.
The test file contains straightforward, deterministic unit tests with proper atomic state management. There is no evidence in the commit history or test code explaining why this flag is necessary. Marking tests as flaky masks intermittent failures by allowing Bazel to retry and pass on subsequent attempts, rather than addressing the underlying issue—typically race conditions, timing dependencies, or improper test isolation.
If the flakiness is known and tracked, please reference the issue number in a BUILD.bazel comment.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/state/BUILD.bazel` at line 21, Remove the flaky = True
attribute from the BUILD.bazel target (the line "flaky = True") and investigate
the root cause of the intermittent failures in the pkg/util/topsql/state tests:
run the test target repeatedly and under race/stress settings locally and in CI,
inspect the state package tests for race conditions or shared/global state, add
proper isolation/setup/teardown or synchronization where needed, and if
flakiness is a known, tracked issue leave a brief comment in BUILD.bazel
referencing the issue number instead of keeping flaky = True.
| func TestTopRUEnableDisableAndResetInterval(t *testing.T) { | ||
| // Contract: TopRU enable is ref-counted; interval resets only when count reaches 0. | ||
| GlobalState.ruConsumerCount.Store(0) | ||
| ResetTopRUItemInterval() | ||
|
|
||
| require.False(t, TopRUEnabled()) | ||
| EnableTopRU() | ||
| require.True(t, TopRUEnabled()) | ||
| EnableTopRU() | ||
| require.True(t, TopRUEnabled()) | ||
|
|
||
| SetTopRUItemInterval(15) | ||
| require.Equal(t, int64(15), GetTopRUItemInterval()) | ||
|
|
||
| DisableTopRU() | ||
| require.True(t, TopRUEnabled()) | ||
| require.Equal(t, int64(15), GetTopRUItemInterval()) | ||
|
|
||
| DisableTopRU() | ||
| require.False(t, TopRUEnabled()) | ||
| require.Equal(t, int64(DefTiDBTopRUItemIntervalSeconds), GetTopRUItemInterval()) | ||
|
|
||
| // Defensive extra disable should not underflow. | ||
| DisableTopRU() | ||
| require.False(t, TopRUEnabled()) | ||
| } |
There was a problem hiding this comment.
Restore GlobalState after each test.
These cases mutate process-global state directly. TestTopRUItemIntervalLastWriteWins exits with a 15s interval, so any later test in this package inherits non-default state unless it remembers to reset it first. Snapshot the current count and interval and restore them in t.Cleanup.
Also applies to: 50-63, 65-83
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/state/state_test.go` around lines 23 - 48, This test mutates
process-global state (GlobalState.ruConsumerCount and the TopRU item interval)
and doesn't restore it, so wrap a snapshot-and-restore in t.Cleanup: before
mutating, capture current GlobalState.ruConsumerCount.Load() and current
GetTopRUItemInterval(), then call t.Cleanup to restore the count (via
GlobalState.ruConsumerCount.Store(saved)) and reset the interval (via
SetTopRUItemInterval(saved) or ResetTopRUItemInterval() as appropriate); ensure
this pattern is applied to TestTopRUEnableDisableAndResetInterval and the other
tests referenced (lines 50-63, 65-83) and use the existing helpers
ResetTopRUItemInterval, SetTopRUItemInterval, GetTopRUItemInterval, EnableTopRU,
DisableTopRU, and TopRUEnabled to restore invariant state.
| func EnableTopRU() { | ||
| GlobalState.ruConsumerCount.Inc() | ||
| } | ||
|
|
||
| // DisableTopRU decrements the TopRU consumer count. | ||
| // When the count reaches 0, ResetTopRUItemInterval is called. | ||
| func DisableTopRU() { | ||
| for { | ||
| prevCount := GlobalState.ruConsumerCount.Load() | ||
| if prevCount <= 0 { | ||
| // Already at 0, nothing to decrement (defensive guard) | ||
| return | ||
| } | ||
|
|
||
| if GlobalState.ruConsumerCount.CAS(prevCount, prevCount-1) { | ||
| // If this was the last subscriber, reset report interval to default | ||
| if prevCount == 1 { | ||
| ResetTopRUItemInterval() | ||
| } | ||
| return | ||
| } | ||
| // CAS failed, retry | ||
| } |
There was a problem hiding this comment.
Serialize the refcount and interval updates.
If the last DisableTopRU() wins the 1 -> 0 transition while a new subscriber concurrently does EnableTopRU() plus SetTopRUItemInterval(), the late ResetTopRUItemInterval() can overwrite the new subscriber's interval. That leaves TopRUEnabled() true with the default cadence instead of the active subscriber's requested one. These three operations need to be one state transition, not separate atomic writes.
Also applies to: 137-156
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/state/state.go` around lines 94 - 116, The decrement in
DisableTopRU() can race with concurrent EnableTopRU() + SetTopRUItemInterval(),
letting ResetTopRUItemInterval() overwrite a new subscriber's interval;
serialize the refcount+interval update by protecting the trio of operations with
a single lock or atomic composite update: add a mutex (e.g.,
GlobalState.topRUMu) and wrap EnableTopRU, DisableTopRU, SetTopRUItemInterval
and ResetTopRUItemInterval (or instead replace separate atomics with a single
CAS on a struct containing {count, interval}) so the transition from 1->0 and
any concurrent 0->1+set are performed as one atomic/locked critical section and
cannot interleave while calling ResetTopRUItemInterval; update callers in
state.go lines around EnableTopRU/DisableTopRU and the other affected block
(137-156) to use the same protection.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/util/topsql/stmtstats/aggregator.go (1)
140-145: Shared map reference passed to multiple collectors.The same
RUIncrementMapinstance is passed to allRUCollectorimplementations. If any collector modifies the map (adds/deletes keys or mutates values), subsequent collectors will see the altered state. This mirrors the existingCollectorpattern; ensure allRUCollectorimplementations treat the map as read-only.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/stmtstats/aggregator.go` around lines 140 - 145, The shared RUIncrementMap instance (total) is being passed to every RUCollector via CollectRUIncrements which allows later collectors to observe mutations by earlier ones; change the call site in the m.ruCollectors.Range loop to pass an independent copy for each collector (e.g., clone/allocate a new RUIncrementMap and copy entries) before calling RUCollector.CollectRUIncrements so each RUCollector receives a read-only snapshot and cannot affect other collectors; reference m.ruCollectors.Range, RUCollector, CollectRUIncrements, and the total RUIncrementMap when implementing the clone.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/util/topsql/stmtstats/aggregator.go`:
- Around line 140-145: The shared RUIncrementMap instance (total) is being
passed to every RUCollector via CollectRUIncrements which allows later
collectors to observe mutations by earlier ones; change the call site in the
m.ruCollectors.Range loop to pass an independent copy for each collector (e.g.,
clone/allocate a new RUIncrementMap and copy entries) before calling
RUCollector.CollectRUIncrements so each RUCollector receives a read-only
snapshot and cannot affect other collectors; reference m.ruCollectors.Range,
RUCollector, CollectRUIncrements, and the total RUIncrementMap when implementing
the clone.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 06b8db9e-c219-4078-b8e5-59857eb39706
📒 Files selected for processing (3)
pkg/util/topsql/stmtstats/aggregator.gopkg/util/topsql/stmtstats/aggregator_test.gopkg/util/topsql/topsql.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/util/topsql/topsql.go
- pkg/util/topsql/stmtstats/aggregator_test.go
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. Notice: To remove the For example:
📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
pkg/util/topsql/reporter/single_target_test.go (1)
70-72:⚠️ Potential issue | 🟡 MinorRestore
TopSQL.ReceiverAddressint.Cleanup.
config.UpdateGlobalmutates process-wide state. LeavingTopSQL.ReceiverAddresspointed at a mock server that gets stopped at test end makes later reporter tests order-dependent.Suggested fix
+origReceiverAddress := config.GetGlobalConfig().TopSQL.ReceiverAddress +t.Cleanup(func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.TopSQL.ReceiverAddress = origReceiverAddress + }) +}) + config.UpdateGlobal(func(conf *config.Config) { conf.TopSQL.ReceiverAddress = server.Address() })Based on learnings: Applies to /{*_test.go,testdata/} : Keep test changes minimal and deterministic; avoid broad golden/testdata churn unless required
Also applies to: 133-135
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/util/topsql/reporter/single_target_test.go` around lines 70 - 72, The test mutates global config via config.UpdateGlobal setting conf.TopSQL.ReceiverAddress to server.Address() but never restores it, causing order-dependent failures; wrap the UpdateGlobal call so you capture the prior value (e.g., old := conf.TopSQL.ReceiverAddress) and register a t.Cleanup that restores conf.TopSQL.ReceiverAddress to the old value (using config.UpdateGlobal inside the cleanup) after the test, ensuring the global state modified by config.UpdateGlobal is reverted; reference the config.UpdateGlobal call and the TopSQL.ReceiverAddress field when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/util/topsql/reporter/single_target.go`:
- Around line 274-279: The sendBatchTopRURecord method currently drops TopRU
records by returning nil; update it to actually forward records via the existing
RPC path: when len(records)>0 call the SingleTargetDataSink helper that performs
the RU RPC (sendTopRURecords) and return any error it produces so errors aren't
silently swallowed; ensure you reference and call sendTopRURecords(ctx, records)
from sendBatchTopRURecord and propagate its returned error.
- Around line 290-296: The CloseAndRecv error handling currently clears any
prior send error by unconditionally setting retErr = nil when status.Code(cErr)
== codes.Unimplemented; change this so Unimplemented is treated as non-fatal
only if there was no previous error from stream.Send. In the
stream.CloseAndRecv() handling block, do not overwrite a non-nil retErr: if
status.Code(cErr) == codes.Unimplemented then return without modifying retErr,
but if retErr is nil you may treat Unimplemented as non-fatal (leave retErr nil)
and return; ensure symbols referenced are stream.CloseAndRecv, stream.Send, and
the retErr variable.
---
Duplicate comments:
In `@pkg/util/topsql/reporter/single_target_test.go`:
- Around line 70-72: The test mutates global config via config.UpdateGlobal
setting conf.TopSQL.ReceiverAddress to server.Address() but never restores it,
causing order-dependent failures; wrap the UpdateGlobal call so you capture the
prior value (e.g., old := conf.TopSQL.ReceiverAddress) and register a t.Cleanup
that restores conf.TopSQL.ReceiverAddress to the old value (using
config.UpdateGlobal inside the cleanup) after the test, ensuring the global
state modified by config.UpdateGlobal is reverted; reference the
config.UpdateGlobal call and the TopSQL.ReceiverAddress field when making the
change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 80a9695c-d398-4ba3-836d-6ac831056563
📒 Files selected for processing (4)
pkg/util/topsql/reporter/datasink.gopkg/util/topsql/reporter/datasink_test.gopkg/util/topsql/reporter/single_target.gopkg/util/topsql/reporter/single_target_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/util/topsql/reporter/datasink_test.go
| func (ds *SingleTargetDataSink) sendBatchTopRURecord(ctx context.Context, records []tipb.TopRURecord) (err error) { | ||
| _ = ctx | ||
| if len(records) == 0 { | ||
| return nil | ||
| } | ||
| return nil |
There was a problem hiding this comment.
TopRU records are still dropped on the SingleTarget path.
doSend now forwards task.data.RURecords, but this method still returns success without opening the RU RPC or using sendTopRURecords. Upstream already materializes real RURecords, so SingleTarget silently loses TopRU data here.
🔧 Proposed fix
func (ds *SingleTargetDataSink) sendBatchTopRURecord(ctx context.Context, records []tipb.TopRURecord) (err error) {
- _ = ctx
if len(records) == 0 {
return nil
}
- return nil
+
+ client := tipb.NewTopSQLAgentClient(ds.conn)
+ stream, err := client.ReportTopRURecords(ctx)
+ if err != nil {
+ if status.Code(err) == codes.Unimplemented {
+ return nil
+ }
+ return err
+ }
+
+ _, err = sendTopRURecords(stream, records)
+ return err
}As per coding guidelines, "Keep error handling actionable and contextual; avoid silently swallowing errors in Go code".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (ds *SingleTargetDataSink) sendBatchTopRURecord(ctx context.Context, records []tipb.TopRURecord) (err error) { | |
| _ = ctx | |
| if len(records) == 0 { | |
| return nil | |
| } | |
| return nil | |
| func (ds *SingleTargetDataSink) sendBatchTopRURecord(ctx context.Context, records []tipb.TopRURecord) (err error) { | |
| if len(records) == 0 { | |
| return nil | |
| } | |
| client := tipb.NewTopSQLAgentClient(ds.conn) | |
| stream, err := client.ReportTopRURecords(ctx) | |
| if err != nil { | |
| if status.Code(err) == codes.Unimplemented { | |
| return nil | |
| } | |
| return err | |
| } | |
| _, err = sendTopRURecords(stream, records) | |
| return err | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/single_target.go` around lines 274 - 279, The
sendBatchTopRURecord method currently drops TopRU records by returning nil;
update it to actually forward records via the existing RPC path: when
len(records)>0 call the SingleTargetDataSink helper that performs the RU RPC
(sendTopRURecords) and return any error it produces so errors aren't silently
swallowed; ensure you reference and call sendTopRURecords(ctx, records) from
sendBatchTopRURecord and propagate its returned error.
| if _, cErr := stream.CloseAndRecv(); cErr != nil { | ||
| if status.Code(cErr) == codes.Unimplemented { | ||
| retErr = nil | ||
| return | ||
| } | ||
| if retErr == nil { | ||
| retErr = cErr |
There was a problem hiding this comment.
Don't clear an existing send failure on CloseAndRecv.
If stream.Send already failed with a non-Unimplemented error, Line 292 still resets retErr to nil when CloseAndRecv returns Unimplemented. That can incorrectly report success after a real delivery failure.
🔧 Proposed fix
if _, cErr := stream.CloseAndRecv(); cErr != nil {
if status.Code(cErr) == codes.Unimplemented {
- retErr = nil
return
}
if retErr == nil {
retErr = cErr
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if _, cErr := stream.CloseAndRecv(); cErr != nil { | |
| if status.Code(cErr) == codes.Unimplemented { | |
| retErr = nil | |
| return | |
| } | |
| if retErr == nil { | |
| retErr = cErr | |
| if _, cErr := stream.CloseAndRecv(); cErr != nil { | |
| if status.Code(cErr) == codes.Unimplemented { | |
| return | |
| } | |
| if retErr == nil { | |
| retErr = cErr |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/util/topsql/reporter/single_target.go` around lines 290 - 296, The
CloseAndRecv error handling currently clears any prior send error by
unconditionally setting retErr = nil when status.Code(cErr) ==
codes.Unimplemented; change this so Unimplemented is treated as non-fatal only
if there was no previous error from stream.Send. In the stream.CloseAndRecv()
handling block, do not overwrite a non-nil retErr: if status.Code(cErr) ==
codes.Unimplemented then return without modifying retErr, but if retErr is nil
you may treat Unimplemented as non-fatal (leave retErr nil) and return; ensure
symbols referenced are stream.CloseAndRecv, stream.Send, and the retErr
variable.
|
@zimulala: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
Chores
Tests
Metrics