-
Notifications
You must be signed in to change notification settings - Fork 4
Parallel Syncing #524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Parallel Syncing #524
Conversation
WalkthroughAdds a parallel sync path with bucketed workers and a new Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI
participant Runner as ConnectorRunner
participant TaskMgr as C1TaskManager
participant FullSync as FullSyncHandler
participant Base as SequentialSyncer
participant Parallel as ParallelSyncer
CLI->>Runner: run with --parallel-sync
Runner->>Runner: append WithParallelSyncEnabled() to opts
Runner->>TaskMgr: NewC1TaskManager(..., parallelSync=true)
TaskMgr->>FullSync: newFullSyncTaskHandler(parallelSync=true)
FullSync->>Base: NewSyncer(...)
alt parallelSync == true
FullSync->>Parallel: NewParallelSyncer(Base, config)
Parallel->>Parallel: start workers & buckets
FullSync->>Parallel: Sync(ctx)
Parallel->>FullSync: Sync complete
FullSync->>Parallel: Close(ctx)
else
FullSync->>Base: Sync(ctx)
Base->>FullSync: Sync complete
FullSync->>Base: Close(ctx)
end
sequenceDiagram
participant Init as NewC1File/C1Z
participant C1File as C1File (store)
participant DB as SQLite
participant Ticker as Checkpoint Goroutine
Init->>C1File: WithC1FWALCheckpoint(true)
C1File->>DB: PRAGMA journal_mode?
alt DB in WAL mode
C1File->>Ticker: startWALCheckpointing()
loop every interval
Ticker->>C1File: acquireCheckpointLock()
Ticker->>DB: PRAGMA wal_checkpoint(TRUNCATE)
Ticker->>C1File: releaseCheckpointLock()
end
end
C1File->>C1File: Close() -> stop ticker, wait for done
sequenceDiagram
participant Caller as sql_helpers
participant Lock as CheckpointLock
participant DB as SQLite
Caller->>Lock: acquireCheckpointLock()
Caller->>DB: begin tx, execute chunk insert
alt success
DB->>Caller: commit
Caller->>Lock: releaseCheckpointLock()
else SQLITE_BUSY or transient
Caller->>Caller: retry with backoff (new tx each attempt)
loop until maxRetries
Caller->>DB: begin tx, execute
end
Caller->>Lock: releaseCheckpointLock()
else error
DB->>Caller: rollback
Caller->>Lock: releaseCheckpointLock()
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Files and areas needing extra attention:
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/dotc1z/c1file.go (1)
351-352: Typo in error message.“datbase” → “database”.
- return fmt.Errorf("c1file: datbase has not been opened") + return fmt.Errorf("c1file: database has not been opened")
🧹 Nitpick comments (9)
pkg/dotc1z/manager/s3/s3.go (1)
21-28: WAL checkpoint option propagation is correct.Option plumbing and conditional C1Z option are consistent. Consider a small debug log when enabled to aid troubleshooting.
@@ func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { if len(s.decoderOptions) > 0 { opts = append(opts, dotc1z.WithDecoderOptions(s.decoderOptions...)) } if s.enableWALCheckpoint { + l.Debug("enabling WAL checkpointing for S3-managed C1Z", zap.String("file_path", s.fileName)) opts = append(opts, dotc1z.WithWALCheckpoint(true)) }Also applies to: 44-48, 140-142
pkg/dotc1z/manager/local/local.go (1)
18-24: Local WAL checkpoint option wiring LGTM; optional observability.The option and propagation are sound. Add a debug log when enabled to simplify field debugging.
@@ func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { if len(l.decoderOptions) > 0 { opts = append(opts, dotc1z.WithDecoderOptions(l.decoderOptions...)) } if l.enableWALCheckpoint { + log.Debug("enabling WAL checkpointing for local C1Z", + zap.String("file_path", l.filePath), + zap.String("temp_path", l.tmpPath), + ) opts = append(opts, dotc1z.WithWALCheckpoint(true)) }Also applies to: 40-44, 122-124
pkg/dotc1z/sql_helpers.go (4)
313-316: SQLite BUSY detection via string match is brittle; prefer driver error codes.If feasible with the chosen driver, detect BUSY using the driver’s error type/code (e.g., sqlite3.Error{Code: sqlite3.ErrBusy}) and fallback to string match.
317-346: Respect context cancellation between chunks.Break out early if ctx is cancelled to avoid long waits under backpressure.
for i := 0; i < chunks; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } start := i * chunkSize ... }
348-427: Improve retry backoff to exponential with jitter.Linear backoff can synchronize retries and increase contention. Use exponential backoff with jitter, and cap the max delay.
- maxRetries := 5 - baseDelay := 10 * time.Millisecond + maxRetries := 5 + baseDelay := 10 * time.Millisecond + maxDelay := 500 * time.Millisecond @@ - case <-time.After(time.Duration(attempt+1) * baseDelay): + // exponential backoff with jitter + delay := baseDelay * (1 << attempt) + if delay > maxDelay { + delay = maxDelay + } + // add small jitter + jitter := time.Duration(int64(delay) / 5) + sleep := delay + time.Duration(rand.Int63n(int64(jitter+1))) + case <-time.After(sleep): continueNote: add
math/randimport (seeded elsewhere or acceptable default).+ "math/rand"
95-273: Apply checkpoint lock consistently to read paths.listConnectorObjects now guards reads; consider the same for getResourceObject/getConnectorObject to avoid WAL checkpoint races during heavy load.
pkg/sync/parallel_syncer.go (3)
354-367: Hot-path log level too noisy; downgrade to Debug.Per-task Add logs at Info will flood logs.
- l.Info("task added to queue", + l.Debug("task added to queue",
1289-1294: Duplicate branch bodies; simplify logging.Both branches log progress identically. Collapse branches.
- if len(resp.List) > 0 { - ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId) - } else { - // Even with no resources, we should log progress - ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId) - } + ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId)(Apply similarly in collectTasks.)
Also applies to: 1435-1439
1719-1727: String literal repeated; make it a const.Replace "next_page" with a package-level const to avoid drift.
+const actionNextPage = "next_page" @@ - for decision.ShouldContinue && decision.Action == "next_page" { + for decision.ShouldContinue && decision.Action == actionNextPage { @@ - Action: "next_page", + Action: actionNextPage, @@ - Action: "next_page", + Action: actionNextPage,Also applies to: 1996-2004, 2053-2059
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
pb/c1/connector/v2/resource.pb.gois excluded by!**/*.pb.gopb/c1/connector/v2/resource_protoopaque.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (16)
pb/c1/connector/v2/resource.pb.validate.go(1 hunks)pkg/cli/commands.go(1 hunks)pkg/connectorrunner/runner.go(4 hunks)pkg/dotc1z/c1file.go(9 hunks)pkg/dotc1z/manager/local/local.go(3 hunks)pkg/dotc1z/manager/manager.go(4 hunks)pkg/dotc1z/manager/s3/s3.go(3 hunks)pkg/dotc1z/sql_helpers.go(4 hunks)pkg/field/defaults.go(2 hunks)pkg/sync/parallel_syncer.go(1 hunks)pkg/sync/state.go(3 hunks)pkg/sync/syncer.go(53 hunks)pkg/tasks/c1api/full_sync.go(4 hunks)pkg/tasks/c1api/manager.go(4 hunks)pkg/tasks/local/syncer.go(4 hunks)proto/c1/connector/v2/resource.proto(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
pkg/cli/commands.go (1)
pkg/connectorrunner/runner.go (1)
WithParallelSyncEnabled(558-563)
pkg/dotc1z/manager/manager.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/c1file.go (1)
WithWALCheckpoint(151-155)pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/tasks/local/syncer.go (3)
pkg/connectorrunner/runner.go (2)
WithParallelSyncEnabled(558-563)Option(254-254)pkg/sync/syncer.go (2)
Syncer(53-56)NewSyncer(3003-3029)pkg/sync/parallel_syncer.go (2)
DefaultParallelSyncConfig(244-249)NewParallelSyncer(818-830)
pkg/dotc1z/manager/s3/s3.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/c1file.go (1)
WithWALCheckpoint(151-155)pkg/dotc1z/manager/local/local.go (2)
WithWALCheckpoint(40-44)Option(26-26)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
C1File(36-60)
pkg/tasks/c1api/full_sync.go (2)
pkg/sync/syncer.go (2)
Syncer(53-56)NewSyncer(3003-3029)pkg/sync/parallel_syncer.go (2)
DefaultParallelSyncConfig(244-249)NewParallelSyncer(818-830)
pkg/field/defaults.go (3)
pb/c1/config/v1/config.pb.go (2)
BoolField(1034-1040)BoolField(1053-1053)pkg/field/fields.go (1)
BoolField(179-199)pkg/field/field_options.go (4)
WithDescription(47-53)WithPersistent(129-135)WithExportTarget(103-111)ExportTargetNone(97-97)
pkg/dotc1z/manager/local/local.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/c1file.go (1)
WithWALCheckpoint(151-155)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (2)
WithWALCheckpoint(44-48)Option(30-30)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
WithParallelSyncEnabled(61-65)Option(29-29)pkg/tasks/c1api/manager.go (1)
NewC1TaskManager(307-332)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/sync/syncer.go (3)
pkg/sync/expand/graph.go (1)
EntitlementGraphAction(14-20)pkg/sync/state.go (1)
Action(130-137)pkg/dotc1z/manager/manager.go (4)
ManagerOption(26-26)WithTmpDir(28-32)WithWALCheckpoint(40-44)New(50-83)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
Syncer(53-56)SequentialSyncer(314-341)pkg/dotc1z/manager/manager.go (1)
New(50-83)pkg/sync/state.go (7)
Action(130-137)CollectEntitlementsAndGrantsTasksOp(126-126)SyncEntitlementsOp(119-119)SyncGrantsOp(121-121)SyncResourcesOp(118-118)SyncGrantExpansionOp(124-124)SyncExternalResourcesOp(122-122)pb/c1/connector/v2/resource.pb.go (6)
ResourceId(2652-2659)ResourceId(2672-2672)ResourceType(126-141)ResourceType(154-154)Resource(2737-2749)Resource(2762-2762)pkg/annotations/annotations.go (1)
Annotations(12-12)
🪛 GitHub Check: go-lint
pkg/dotc1z/sql_helpers.go
[failure] 396-396:
Error return value of tx.Rollback is not checked (errcheck)
[failure] 389-389:
Error return value of tx.Rollback is not checked (errcheck)
[failure] 381-381:
Error return value of tx.Rollback is not checked (errcheck)
pkg/dotc1z/c1file.go
[failure] 483-483:
Comment should end in a period (godot)
[failure] 466-466:
Comment should end in a period (godot)
[failure] 456-456:
Comment should end in a period (godot)
[failure] 514-514:
use of fmt.Printf forbidden by pattern ^(fmt\.Print(|f|ln)|print|println)$ (forbidigo)
pkg/sync/parallel_syncer.go
[failure] 1435-1435:
dupBranchBody: both branches in if statement have same body (gocritic)
[failure] 1289-1289:
dupBranchBody: both branches in if statement have same body (gocritic)
[failure] 1667-1667:
string next_page has 4 occurrences, make it a constant (goconst)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: go-test (1.25.2, ubuntu-latest)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (18)
pkg/dotc1z/sql_helpers.go (1)
228-231: Checkpoint lock correctly implements RLock for normal DB operations and exclusive Lock for checkpoint.Verified:
acquireCheckpointLock()usesRLock()at pkg/dotc1z/c1file.go:486,releaseCheckpointLock()usesRUnlock()at pkg/dotc1z/c1file.go:493, and the checkpoint operation uses exclusiveLock()at pkg/dotc1z/c1file.go:503. This is the correct pattern—no contention issues.pkg/field/defaults.go (1)
87-87: End-to-end wiring confirmed; changes approved.The
parallel-syncfield is properly wired throughout the codebase:
- CLI reads the flag and creates the appropriate option
- Runner passes it to both local syncer and manager
- Both paths use it to conditionally instantiate the parallel syncer
- Field persistence and export settings are correct
pkg/dotc1z/manager/manager.go (1)
20-24: WAL checkpoint toggle properly propagated across all manager variants.Verified that
WithWALCheckpointis correctly defined at line 40, applied tomanagerOptionsstruct (lines 20-24), and propagated to both S3Manager (lines 66-68) and LocalManager (lines 78-80) constructors in theNew()function. Usage in syncer.go is consistent with the option pattern. No issues identified.pb/c1/connector/v2/resource.pb.validate.go (1)
168-169: Generated file note.Comment-only change; no issues. Ensure generator inputs (proto + protoc-gen-validate) are the source of truth.
pkg/sync/parallel_syncer.go (1)
1769-1774: No issues found with progress increment semantics.Verification confirms the code correctly increments by 1 per resource processed (not per items). This is consistent throughout the codebase: both
AddGrantsProgressandAddEntitlementsProgresscalls at lines 1964 and 2049 use hardcoded count=1, matching the pattern in syncer.go. The logging functions track progress indexed by resourceType, confirming 1 unit = 1 resource processed, which is the intended sequential semantics.pkg/cli/commands.go (1)
332-334: LGTM!The parallel sync flag is properly integrated into the command options flow. The placement after the main switch and before resource validation checks is appropriate.
pkg/tasks/c1api/full_sync.go (1)
86-98: LGTM!The conditional wrapping pattern correctly creates a base syncer and then wraps it with a parallel syncer when enabled. The worker count configuration (10 workers) is explicitly set, and error handling is properly maintained.
proto/c1/connector/v2/resource.proto (1)
47-52: LGTM!The sync_bucket field is well-documented and appropriately optional, enabling gradual adoption of parallel sync bucketing. The field semantics are clear: same bucket = sequential, different buckets = parallel.
pkg/tasks/c1api/manager.go (2)
56-56: LGTM!The parallelSync field is appropriately added to the manager struct and propagated through to the full sync task handler.
307-331: LGTM!The constructor signature is correctly updated to accept the parallelSync parameter and properly initializes the field.
pkg/connectorrunner/runner.go (3)
558-563: LGTM!The parallel sync option follows the established pattern and correctly propagates the flag to downstream components.
780-780: LGTM!The parallel sync flag is correctly passed to the local syncer for on-demand sync paths.
793-802: LGTM!The parallel sync flag is properly passed to the C1 task manager for daemon mode sync paths.
pkg/sync/syncer.go (5)
66-82: LGTM!Thread-safety additions to ProgressCounts are well-designed. The sequentialMode flag (defaulting to true) maintains backward compatibility while enabling concurrent access when needed.
278-311: LGTM!The thread-safe methods for ProgressCounts follow proper mutex locking patterns. These methods enable safe concurrent updates from parallel sync workers.
314-341: LGTM!Renaming syncer to SequentialSyncer and making it public is appropriate for distinguishing it from the new parallel syncer. The enableWALCheckpoint field is properly added to support the parallel sync infrastructure.
2817-2849: LGTM!The WAL checkpoint option is properly propagated to the manager when enableWALCheckpoint is true. This integration supports the parallel sync feature by preventing checkpoint failures under high concurrency.
2886-3029: LGTM!All SyncOpt functions are consistently updated to operate on *SequentialSyncer, and the NewSyncer constructor correctly returns the new public type. The changes maintain backward compatibility in behavior while enabling the new parallel sync infrastructure.
| // Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency | ||
| baseSyncer.enableWALCheckpoint = true | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParallelSyncer unconditionally enables WAL checkpointing on base syncer.
This overrides caller intent. Consider honoring existing setting or making it explicit in ParallelSyncConfig.
- // Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency
- baseSyncer.enableWALCheckpoint = true
+ // Enable WAL checkpointing if not explicitly set by caller.
+ if !baseSyncer.enableWALCheckpoint {
+ baseSyncer.enableWALCheckpoint = true
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency | |
| baseSyncer.enableWALCheckpoint = true | |
| // Enable WAL checkpointing if not explicitly set by caller. | |
| if !baseSyncer.enableWALCheckpoint { | |
| baseSyncer.enableWALCheckpoint = true | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For parallel sync, we always want wal checkpointing enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jirwin, understood! Thank you for clarifying. Since parallel sync always requires WAL checkpointing due to the high concurrency, the unconditional enablement makes sense.
The existing comment does explain why, but if you'd like to make the requirement even more explicit, you could consider enhancing it slightly to:
// Enable WAL checkpointing for parallel sync - this is required (not optional) to prevent
// checkpoint failures under high concurrency
baseSyncer.enableWALCheckpoint = trueThough the current comment is clear enough. I've noted this design requirement for future reference.
✏️ Learnings added
Learnt from: jirwin
PR: ConductorOne/baton-sdk#524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.698Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
pkg/sync/parallel_syncer.go
Outdated
| // Skip the default bucket (used for final tasks) | ||
| if bucketName == "resource-type-" { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bucket name equality check is ineffective.
Comparing to "resource-type-" never matches actual default buckets ("resource-type-"). If you meant to skip default buckets, use HasPrefix.
- if bucketName == "resource-type-" {
+ if strings.HasPrefix(bucketName, "resource-type-") {
continue
}(Ensure strings is imported at top if not already.)
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 1098 to 1101, the code compares
bucketName to the literal "resource-type-" which never matches default buckets
like "resource-type-<id>"; change the check to use strings.HasPrefix(bucketName,
"resource-type-") to skip those default buckets, and add/import the "strings"
package at the file top if it’s not already imported.
| case CollectEntitlementsAndGrantsTasksOp: | ||
| return "collect-entitlements-and-grants-tasks" | ||
| default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
New operation wired correctly; add JSON round‑trip tests.
Add unit tests to verify MarshalJSON/UnmarshalJSON round‑trip for CollectEntitlementsAndGrantsTasksOp and token serialization.
Also confirm whether ListResourcesForEntitlementsOp intentionally lacks a String()/newActionOp mapping.
Also applies to: 107-109, 126-127
🏁 Script executed:
#!/bin/bash
# Quick grep to validate string mappings for all ActionOp variants
rg -nP 'type ActionOp|const \(|String\(\)|newActionOp\(' -nLength of output: 268504
🏁 Script executed:
#!/bin/bash
# Locate and examine ActionOp enum, String(), and newActionOp in pkg/sync/state.go
fd -t f 'state\.go' pkg/sync/ | head -5Length of output: 17
🏁 Script executed:
#!/bin/bash
# Display full pkg/sync/state.go to understand ActionOp structure
cat pkg/sync/state.go | head -150Length of output: 4272
🏁 Script executed:
#!/bin/bash
# Search for String() method, newActionOp function, and ListResourcesForEntitlementsOp
rg -n 'func.*String\(\)|newActionOp|ListResourcesForEntitlementsOp' pkg/sync/state.goLength of output: 237
🏁 Script executed:
#!/bin/bash
# Look for test files related to state.go
fd -t f 'state.*test\.go|test.*state' pkg/sync/Length of output: 22
---
🏁 Script executed:
```shell
#!/bin/bash
# Examine state_test.go for marshal/unmarshal test coverage
cat pkg/sync/state_test.go
Length of output: 3242
🏁 Script executed:
#!/bin/bash
# Search for specific test coverage of ActionOp marshalling
rg -n 'MarshalJSON|UnmarshalJSON|CollectEntitlementsAndGrantsTasksOp|ListResourcesForEntitlementsOp' pkg/sync/state_test.goLength of output: 0
Add JSON round-trip tests for the new CollectEntitlementsAndGrantsTasksOp operation.
The code change correctly implements String() and newActionOp() mappings for CollectEntitlementsAndGrantsTasksOp, but TestSyncerTokenMarshalUnmarshal lacks coverage for this new operation. Extend the test to include this enum member to verify MarshalJSON/UnmarshalJSON work correctly.
Separately, ListResourcesForEntitlementsOp is confirmed to intentionally lack String()/newActionOp mappings (pre-existing behavior); if intentional, this is acceptable but may warrant a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
♻️ Duplicate comments (7)
pkg/dotc1z/c1file.go (3)
271-348: Stats() method still lacks checkpoint read locks.This is the same critical issue raised in the past review comment. The DB read operations at lines 319-322, 330-332, and 338-340 (CountContext calls) are not wrapped with
acquireCheckpointLock()/releaseCheckpointLock(), making the checkpoint coordination mechanism ineffective. WhenperformWALCheckpoint()acquires the write lock, these hot-path reads can still execute, potentially causing checkpoint starvation.Add checkpoint read locks around each CountContext call, following the pattern used in
sql_helpers.go.Example for the first DB call:
+ c.acquireCheckpointLock() resourceCount, err := c.db.From(resources.Name()). Where(goqu.C("resource_type_id").Eq(rt.Id)). Where(goqu.C("sync_id").Eq(syncId)). CountContext(ctx) + c.releaseCheckpointLock() if err != nil { return nil, err }Apply similar wrapping for the CountContext calls at lines 330-332 and 338-340.
399-456: GrantStats() method still lacks checkpoint read locks.This is the same critical issue raised in the past review comment. The DB read operation at lines 444-447 (CountContext call) is not wrapped with
acquireCheckpointLock()/releaseCheckpointLock(), making the checkpoint coordination mechanism ineffective.Add checkpoint read locks around the CountContext call:
+ c.acquireCheckpointLock() grantsCount, err := c.db.From(grants.Name()). Where(goqu.C("sync_id").Eq(syncId)). Where(goqu.C("resource_type_id").Eq(resourceType.Id)). CountContext(ctx) + c.releaseCheckpointLock() if err != nil { return nil, err }
458-466: isWALMode() still uses case-sensitive comparison without DB query fallback.This is the same issue raised in the past review comment. The current implementation could miss cases where journal_mode is set with different casing or configured outside the pragmas slice.
Consider implementing the suggested case-insensitive comparison with DB query fallback for robustness.
pkg/sync/parallel_syncer.go (4)
75-89: Panic on task enqueue failure can crash the sync.Line 86 panics when task re-enqueuing fails, which will terminate the entire sync process. This is a critical failure mode already noted in past review.
379-463: Unsafe dynamic channel replacement can lose tasks and deadlock producers.
expandQueueAndRetryreplaces a bucket's channel while producers may still hold references to the old channel. Producers blocked on the old channel will never complete, and the old channel is never drained by workers after replacement. This critical concurrency bug was already flagged in past review.
579-590: Closing bucket channels risks panic if producers still send.
Close()closes all per-bucket channels while producers may still attempt to send. This will cause a panic. This critical issue was already noted in past review—channels should not be closed; instead, mark queue as closed and signal workers via context.
822-835: Unconditionally overrides WAL checkpoint setting.Line 829 unconditionally sets
baseSyncer.enableWALCheckpoint = true, overriding any caller configuration. This issue was already noted in past review comments.
🧹 Nitpick comments (3)
pkg/dotc1z/sql_helpers.go (2)
314-316: Consider checking SQLite error codes instead of string matching.String matching error messages is fragile and may produce false positives. If using mattn/go-sqlite3 or a similar driver, prefer checking the error code directly.
Example refactor:
func isSQLiteBusy(err error) bool { - return strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) { + return sqliteErr.Code == sqlite3.ErrBusy || sqliteErr.ExtendedCode == sqlite3.ErrBusySnapshot + } + return false }
349-437: LGTM: Retry logic and error handling are well-implemented.The function correctly handles rollback errors (using
errors.Join), properly manages the checkpoint lock lifecycle across all code paths, and addresses the previous review comment regarding errcheck warnings. The retry logic with context cancellation is sound.Consider adding debug logging when retries occur to improve observability during production issues:
if isSQLiteBusy(err) && attempt < maxRetries-1 { + ctxzap.Extract(ctx).Debug("retrying chunk insert due to SQLITE_BUSY", + zap.Int("attempt", attempt+1), + zap.Int("maxRetries", maxRetries), + ) select {pkg/sync/parallel_syncer.go (1)
2144-2152: Type assertion in NewParallelSyncerFromSyncer could be more helpful.Lines 2147-2148 return a generic error when the type assertion fails. Consider providing more context about what types are supported and why the assertion failed.
func NewParallelSyncerFromSyncer(s Syncer, config *ParallelSyncConfig) (*parallelSyncer, error) { // Try to cast to the concrete syncer type if baseSyncer, ok := s.(*SequentialSyncer); ok { return NewParallelSyncer(baseSyncer, config), nil } - return nil, fmt.Errorf("cannot create parallel syncer from syncer type: %T", s) + return nil, fmt.Errorf("cannot create parallel syncer from syncer type %T; only *SequentialSyncer is supported", s) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
pkg/dotc1z/c1file.go(10 hunks)pkg/dotc1z/sql_helpers.go(4 hunks)pkg/sync/parallel_syncer.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
C1File(38-62)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
Syncer(53-56)SequentialSyncer(314-341)pkg/dotc1z/manager/manager.go (1)
New(50-83)pkg/sync/state.go (1)
Action(130-137)pb/c1/connector/v2/resource.pb.go (4)
ResourceType(126-141)ResourceType(154-154)Resource(2737-2749)Resource(2762-2762)pkg/annotations/annotations.go (1)
Annotations(12-12)
🪛 GitHub Check: go-lint
pkg/dotc1z/c1file.go
[failure] 499-499:
Comment should end in a period (godot)
pkg/sync/parallel_syncer.go
[failure] 1241-1241:
func (*parallelSyncer).syncResources is unused (unused)
[failure] 1077-1077:
SA4010: this result of append is never used, except maybe in other appends (staticcheck)
[failure] 37-37:
function min has same name as predeclared identifier (predeclared)
[failure] 1199-1199:
Comment should end in a period (godot)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: go-test (1.25.2, ubuntu-latest)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (16)
pkg/dotc1z/c1file.go (10)
14-15: LGTM!Correct imports added for structured logging (ctxzap and zap), addressing the past review comment about replacing fmt.Printf.
55-61: LGTM!WAL checkpoint fields are well-structured with proper synchronization primitives (RWMutex, channels, sync.Once) and clear documentation.
80-84: LGTM!Standard functional option pattern correctly implemented.
106-107: LGTM!Checkpoint channels properly initialized. Unbuffered channels are appropriate for stop signaling and done notification.
127-132: LGTM!C1Z-level WAL checkpoint option correctly implemented, following the established functional option pattern.
Also applies to: 153-157
178-180: LGTM!Correctly propagates WAL checkpoint flag from C1Z-level to C1F-level option.
205-212: LGTM!Checkpoint cleanup logic is correct and idempotent. The nil check, sync.Once protection for channel closure, and wait on checkpointDone ensure graceful shutdown even if Close() is called multiple times.
263-266: LGTM!Checkpoint initialization correctly gated by both the enabled flag and WAL mode check.
468-483: LGTM!Background checkpoint goroutine follows proper lifecycle patterns with correct cleanup (defer close on checkpointDone) and graceful shutdown via checkpointStop channel. 5-minute interval is reasonable for WAL checkpointing.
485-497: LGTM!Lock helper methods provide clean conditional locking based on the checkpointEnabled flag. Implementation is correct and straightforward.
pkg/dotc1z/sql_helpers.go (2)
229-231: LGTM: Checkpoint lock properly coordinates WAL operations.The checkpoint lock acquisition correctly prevents WAL checkpoints during query execution and result iteration, avoiding potential consistency issues and WAL file growth under concurrent load.
339-346: LGTM: Clean separation of chunking and retry logic.The refactoring to delegate per-chunk execution to
executeChunkWithRetryimproves code organization and testability.pkg/sync/parallel_syncer.go (4)
170-237: Clean state abstraction for parallel execution.The
StateInterface,LocalStateContext, andActionDecisiontypes provide a well-designed abstraction that allows parallel workers to maintain independent state while reusing core sync logic. No concurrency concerns since each worker maintains its ownLocalStateContext.
617-732: Worker loop and task dispatching logic is sound.The worker's main loop (lines 617-732) properly handles context cancellation, implements bucket affinity with work-stealing, tracks consecutive failures, and manages rate limiting with bucket-aware backoff. The atomic flags for
isProcessingandrateLimitedare correctly used.
838-889: Parallel sync orchestration is well-structured.The
Sync()method properly orchestrates parallel execution: initializes sync, creates task queue, starts workers, generates initial tasks, waits for completion, then sequentially handles grant expansion and external resources. The lifecycle management and cleanup are correct.
1855-2006: Grant sync logic correctly handles pagination, etags, and state.
syncGrantsForResourceLogicproperly fetches previous sync data, handles etags, processes grants with expansion/external resource detection, stores results, and returns pagination decisions. The state abstraction allows both parallel and sequential callers.
| // processTaskImmediately processes a task directly without going through the queue. | ||
| func (w *worker) processTaskImmediately(task *task) error { | ||
| l := ctxzap.Extract(w.ctx) | ||
| l.Debug("processing task immediately", | ||
| zap.String("operation", task.Action.Op.String()), | ||
| zap.String("resource_type", task.Action.ResourceTypeID), | ||
| zap.String("resource_id", task.Action.ResourceID)) | ||
|
|
||
| switch task.Action.Op { | ||
| case CollectEntitlementsAndGrantsTasksOp: | ||
| // For resource tasks, we need to collect any sub-tasks and process them immediately too | ||
| tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // Process any collected tasks immediately | ||
| for _, subTask := range tasks { | ||
| if err := w.processTaskImmediately(subTask); err != nil { | ||
| l.Error("failed to process sub-task immediately", zap.Error(err)) | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| case SyncEntitlementsOp: | ||
| if task.Action.ResourceID != "" { | ||
| return w.syncer.syncEntitlementsForResource(w.ctx, task.Action) | ||
| } else { | ||
| return w.syncer.syncEntitlementsForResourceType(w.ctx, task.Action) | ||
| } | ||
| case SyncGrantsOp: | ||
| if task.Action.ResourceID != "" { | ||
| return w.syncer.syncGrantsForResource(w.ctx, task.Action) | ||
| } else { | ||
| return w.syncer.syncGrantsForResourceType(w.ctx, task.Action) | ||
| } | ||
| case SyncResourcesOp: | ||
| // For resource tasks, we need to collect any sub-tasks and process them immediately too | ||
| tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // Process any collected tasks immediately | ||
| for _, subTask := range tasks { | ||
| if err := w.processTaskImmediately(subTask); err != nil { | ||
| l.Error("failed to process sub-task immediately", zap.Error(err)) | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| default: | ||
| return fmt.Errorf("unsupported operation for immediate processing: %s", task.Action.Op.String()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recursive immediate processing risks stack overflow.
processTaskImmediately recursively calls itself for sub-tasks (lines 133, 159). Deep task chains (e.g., deeply nested child resources) could exhaust the stack. Consider iterative processing with a local queue or enforce a depth limit.
func (w *worker) processTaskImmediately(task *task) error {
l := ctxzap.Extract(w.ctx)
+ const maxDepth = 100
+
+ // Use a local stack to avoid stack overflow
+ taskStack := []*task{task}
+
+ for len(taskStack) > 0 {
+ if len(taskStack) > maxDepth {
+ return fmt.Errorf("task depth exceeded %d, possible infinite recursion", maxDepth)
+ }
+
+ currentTask := taskStack[len(taskStack)-1]
+ taskStack = taskStack[:len(taskStack)-1]
+
l.Debug("processing task immediately",
- zap.String("operation", task.Action.Op.String()),
- zap.String("resource_type", task.Action.ResourceTypeID),
- zap.String("resource_id", task.Action.ResourceID))
+ zap.String("operation", currentTask.Action.Op.String()),
+ zap.String("resource_type", currentTask.Action.ResourceTypeID),
+ zap.String("resource_id", currentTask.Action.ResourceID))
- switch task.Action.Op {
+ switch currentTask.Action.Op {
case CollectEntitlementsAndGrantsTasksOp:
- // For resource tasks, we need to collect any sub-tasks and process them immediately too
- tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action)
+ tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, currentTask.Action)
if err != nil {
return err
}
- // Process any collected tasks immediately
- for _, subTask := range tasks {
- if err := w.processTaskImmediately(subTask); err != nil {
- l.Error("failed to process sub-task immediately", zap.Error(err))
- return err
- }
- }
- return nil
+ // Add sub-tasks to stack for processing
+ taskStack = append(taskStack, tasks...)
// ... handle other cases similarly
+ }
+ }
+ return nil
}Committable suggestion skipped: line range outside the PR's diff.
| func (q *taskQueue) GetTask(ctx context.Context) (*task, error) { | ||
| q.mu.Lock() // Use write lock to make the operation atomic | ||
| defer q.mu.Unlock() | ||
|
|
||
| // Debug logging | ||
| l := ctxzap.Extract(ctx) | ||
| l.Debug("GetTask called", | ||
| zap.Int("total_buckets", len(q.bucketQueues)), | ||
| zap.Strings("bucket_names", getMapKeys(q.bucketQueues))) | ||
|
|
||
| if len(q.bucketQueues) == 0 { | ||
| l.Debug("no buckets available") | ||
| return nil, errors.New("no buckets available") | ||
| } | ||
|
|
||
| // First, try to find a bucket with available tasks | ||
| var availableBuckets []string | ||
| for bucketName, queue := range q.bucketQueues { | ||
| queueLen := len(queue) | ||
| l.Debug("checking bucket", zap.String("bucket", bucketName), zap.Int("queue_length", queueLen)) | ||
| if queueLen > 0 { | ||
| availableBuckets = append(availableBuckets, bucketName) | ||
| } | ||
| } | ||
|
|
||
| l.Debug("available buckets", zap.Strings("buckets", availableBuckets)) | ||
|
|
||
| if len(availableBuckets) == 0 { | ||
| l.Debug("no tasks available in any bucket") | ||
| return nil, errors.New("no tasks available") | ||
| } | ||
|
|
||
| // Try to get a task from each available bucket in round-robin order | ||
| // Use a more robust approach that handles the case where a queue becomes empty | ||
| for _, bucketName := range availableBuckets { | ||
| queue := q.bucketQueues[bucketName] | ||
|
|
||
| // Double-check the queue still has items before trying to read | ||
| if len(queue) == 0 { | ||
| l.Debug("bucket queue became empty", zap.String("bucket", bucketName)) | ||
| continue | ||
| } | ||
|
|
||
| select { | ||
| case t := <-queue: | ||
| l.Debug("retrieved task from bucket", zap.String("bucket", bucketName)) | ||
| return t, nil | ||
| default: | ||
| l.Debug("bucket queue empty when trying to read", zap.String("bucket", bucketName)) | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| l.Debug("failed to get task from any available bucket") | ||
| return nil, errors.New("no tasks available") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetTask holds write lock for entire operation, causing contention.
Lines 483-484 acquire a write lock that's held while iterating buckets and reading from channels. This blocks all concurrent AddTask operations. Consider using a read lock for bucket discovery, then acquiring write lock only when actually removing a task from a channel, or use a lock-free approach.
func (q *taskQueue) GetTask(ctx context.Context) (*task, error) {
- q.mu.Lock() // Use write lock to make the operation atomic
- defer q.mu.Unlock()
+ q.mu.RLock()
// Debug logging
l := ctxzap.Extract(ctx)
@@ -494,6 +494,7 @@
return nil, errors.New("no buckets available")
}
+ q.mu.RUnlock()
// First, try to find a bucket with available tasks
var availableBuckets []string
+ q.mu.RLock()
for bucketName, queue := range q.bucketQueues {
@@ -506,6 +507,7 @@
}
l.Debug("available buckets", zap.Strings("buckets", availableBuckets))
+ q.mu.RUnlock()
if len(availableBuckets) == 0 {Note: This is a simplified sketch. A production fix should carefully handle the race between checking len(queue) and reading from it.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 482-537, the GetTask function
currently holds the write lock for the whole discovery+read operation which
blocks AddTask; change it to use a read lock (RLock) to discover non-empty
buckets and capture references to their channels and names, then release the
read lock before attempting non-blocking reads from those channels; only acquire
the write lock briefly if you need to modify q.bucketQueues or remove an empty
bucket (do not hold it while performing channel receives). Also remove
len(queue) checks that rely on the lock by using non-blocking select receives to
avoid races, retain debug logging, and ensure any mutation of the map is done
under the write lock only for the minimal window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (14)
pkg/dotc1z/c1file.go (3)
271-348: Stats method still lacks checkpoint read locks (duplicate concern).The past review comment flagged that Stats performs database reads without acquiring checkpoint read locks, making the checkpoint coordination ineffective. The current code still shows no calls to
acquireCheckpointLock()/releaseCheckpointLock()around the threeCountContext()operations (lines 319-322, 330-332, 338-340).Either the fix was reverted, or this code predates the addressed commit. Verify that checkpoint locks are properly acquired before these DB operations.
399-456: GrantStats method still lacks checkpoint read locks (duplicate concern).Similar to Stats, this method performs database reads (lines 444-447) without acquiring checkpoint read locks. The past review comment flagged this issue, but the current code does not show the fix. Verify that
acquireCheckpointLock()/releaseCheckpointLock()are properly used.
499-499: Fix comment punctuation to satisfy linter (duplicate concern).The function comment on line 499 does not end with a period, which violates the godot linter rule. A past review comment flagged this same issue.
Apply this diff:
-// performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE +// performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE.pkg/sync/parallel_syncer.go (11)
85-87: Panic can crash the entire sync.Panicking here (line 86) will terminate the worker and potentially the entire sync process. Consider logging the error with a critical severity and either: (1) retrying with exponential backoff, (2) storing failed tasks for manual intervention, or (3) returning an error to gracefully shut down the worker.
116-168: Recursive processing risks stack overflow on deep task chains.Lines 132–136 and 158–162 recursively call
processTaskImmediatelyfor sub-tasks. Deeply nested child resources or long task chains could exhaust the call stack. Consider iterative processing with a local task queue or enforce a depth limit.
270-276: Priority field is unused.The
Priorityfield (line 274) is set throughout the code butGetTask()(lines 481–537) uses round-robin bucket selection and FIFO ordering, completely ignoring priority. Either remove this field or implement priority-based scheduling.
379-463: Unsafe dynamic channel resize can deadlock or lose tasks.Lines 400–463 attempt to "expand" a bucket channel by creating a new one and copying tasks. Producers that obtained the old channel reference (line 382) before the expansion can be left blocked forever (no reader drains the old channel) or may panic. Avoid dynamic resizing of buffered channels. Instead, return
errTaskQueueFulland rely on the caller's retry/backoff logic.
481-537: GetTask holds write lock for entire operation, blocking AddTask.Lines 483–484 acquire a write lock held for the entire bucket discovery and channel-read operation, blocking all concurrent
AddTaskcalls. Use a read lock (RLock) to discover non-empty buckets, release it, then use non-blocking select receives. Only acquire the write lock briefly if you need to modify the bucket map.
578-590: Closing bucket channels risks panic if producers still send.Lines 584–587 close per-bucket channels. If any producer still holds a channel reference obtained earlier and sends to it, the program will panic. Instead, set
q.closed = trueunder the lock and signal workers via context cancellation or a shared done channel. Do not close individual bucket channels.
778-789: Incorrect rate limit detection using sql.ErrConnDone.Line 788 treats
sql.ErrConnDoneas a rate limit indicator, but this error means the database connection is closed, not that an API is rate-limited. Remove this check and rely solely oncodes.ResourceExhaustedfor gRPC rate limiting.func (w *worker) isRateLimitError(err error) bool { - // Check for rate limit annotations in the error if err == nil { return false } - // This is a simplified check - in practice, you'd want to check the actual - // error type returned by the connector for rate limiting - return status.Code(err) == codes.ResourceExhausted || - errors.Is(err, sql.ErrConnDone) // Placeholder for rate limit detection + return status.Code(err) == codes.ResourceExhausted }
827-829: Unconditionally overrides WAL checkpoint setting.Line 828 unconditionally sets
baseSyncer.enableWALCheckpoint = true, overriding any caller intent. Consider honoring the existing setting or making this explicit inParallelSyncConfig.- // Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency - baseSyncer.enableWALCheckpoint = true + // Enable WAL checkpointing if not already set by caller + if !baseSyncer.enableWALCheckpoint { + baseSyncer.enableWALCheckpoint = true + }
1071-1080: Unused activeBuckets slice.Lines 1073–1078 build an
activeBucketsslice but never use it (SA4010). Either log the active buckets withl.Debug("active buckets", zap.Strings("active_buckets", activeBuckets))or remove this dead code.
1199-1199: Missing period in comment.Line 1199 comment should end with a period per godot linter:
// finalizeSync performs final sync cleanup.-// finalizeSync performs final sync cleanup +// finalizeSync performs final sync cleanup. func (ps *parallelSyncer) finalizeSync(ctx context.Context) error {
71-71: Unreachable return statement.Line 71 is unreachable because the loop at lines 46–69 always returns within its body (lines 49, 58, or inside the select at lines 64–67). Remove this line or replace it with a panic/error to indicate an unexpected state.
- return nil // This should never be reached + // Unreachable: loop above always returns
🧹 Nitpick comments (4)
pkg/dotc1z/c1file.go (1)
458-466: Make WAL mode detection more robust with case-insensitive comparison and fallback query.The current implementation uses exact string matching for pragma name and value, which will fail if the journal mode is specified as "wal" (lowercase) or set through other means (connection string, environment variable, prior session). A past review suggested adding case-insensitive comparison and a fallback query to
PRAGMA journal_mode.Consider implementing the suggested approach for more robust detection.
pkg/sync/parallel_syncer.go (1)
1248-1255: Panic recovery logs but doesn't return error.Lines 1248–1255 use
deferwithrecover()to catch panics, but only log the panic without returning an error to the caller. This could mask serious issues. Either return an error from the deferred function (requires named return) or remove the recovery to let panics propagate naturally for debugging.pkg/sync/syncer.go (2)
72-72: Remove outdated TODO comment.Line 72's TODO comment states "use a mutex or a syncmap for when this code becomes parallel" but the mutex has now been implemented (lines 66-67). This comment should be removed or updated.
-// TODO: use a mutex or a syncmap for when this code becomes parallel func NewProgressCounts() *ProgressCounts {
90-177: Consider helper methods to reduce conditional locking duplication.Lines 92–98, 108–118, 128–134, 148–154, 170–176, etc. all duplicate the conditional locking pattern. While functionally correct, consider extracting helpers like
getResourceCount(resourceType string) intandupdateLastLogTime(map, resourceType, time)to reduce duplication and improve maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
pkg/dotc1z/c1file.go(10 hunks)pkg/sync/parallel_syncer.go(1 hunks)pkg/sync/syncer.go(53 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/sync/parallel_syncer.go (4)
pkg/sync/syncer.go (2)
Syncer(53-56)SequentialSyncer(314-341)pkg/dotc1z/manager/manager.go (1)
New(50-83)pkg/sync/state.go (1)
Action(130-137)pb/c1/connector/v2/resource.pb.go (10)
ResourceId(2652-2659)ResourceId(2672-2672)ResourceType(126-141)ResourceType(154-154)ResourceTypesServiceListResourceTypesRequest(275-284)ResourceTypesServiceListResourceTypesRequest(297-297)ResourcesServiceListResourcesRequest(2925-2935)ResourcesServiceListResourcesRequest(2948-2948)Resource(2737-2749)Resource(2762-2762)
pkg/sync/syncer.go (3)
pkg/sync/expand/graph.go (1)
EntitlementGraphAction(14-20)pkg/sync/state.go (1)
Action(130-137)pkg/dotc1z/manager/manager.go (4)
ManagerOption(26-26)WithTmpDir(28-32)WithWALCheckpoint(40-44)New(50-83)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: go-test (1.25.2, ubuntu-latest)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (7)
pkg/dotc1z/c1file.go (1)
500-517: Checkpoint implementation is solid.The WAL checkpoint logic correctly:
- Acquires write lock to pause all database operations
- Uses appropriate timeout (30s)
- Implements sensible fallback strategy (TRUNCATE → RESTART)
- Logs errors without failing the operation
- Properly releases the lock via defer
pkg/sync/syncer.go (6)
278-311: Thread-safe mutation methods are well-designed.The new mutation methods (lines 280–311) always acquire the mutex for safety, while the read/log methods conditionally lock based on
sequentialMode. This is a sound design: mutations are always safe, and reads optimize for the sequential case.
313-341: SequentialSyncer export and WAL checkpoint field enable parallel wrapping.Renaming to
SequentialSyncer(line 314) and addingenableWALCheckpoint(line 340) are appropriate changes that enable the parallel syncer to wrap and configure the sequential implementation. The public API surface is well-defined.
1263-1269: Conditional progress update is correct.Lines 1263–1269 correctly use conditional locking based on
sequentialModewhen updatingEntitlementsProgress, maintaining consistency with the established pattern.
2817-2849: WAL checkpoint propagation to manager is correct.Lines 2826–2830 correctly check
enableWALCheckpointand conditionally append theWithWALCheckpointoption to the manager, enabling the parallel syncer to control this behavior.
2886-3000: SyncOpt API update is consistent and correct.All option functions (lines 2886–3000) have been correctly updated to target
*SequentialSyncerinstead of the internal*syncertype, maintaining API consistency after the rename.
3002-3029: NewSyncer correctly returns exported SequentialSyncer.Lines 3003–3004 correctly update the constructor to return
*SequentialSyncerinstead of the internal type, completing the public API transition.
2446b88 to
1bccadf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/dotc1z/c1file.go (1)
272-349: Add checkpoint read locks to Stats() and GrantStats() database operations.Verification confirms the checkpoint locks are missing. The pattern is established in sql_helpers.go (lines 230-231 and 361-365), wrapping database operations with
c.acquireCheckpointLock()anddefer c.releaseCheckpointLock(). Both Stats() and GrantStats() execute hot-pathCountContextqueries without these locks:
- Stats() (lines 320-323, 331-333, 339-341): Three
CountContextcalls need lock protection- GrantStats() (lines 445-448): One
CountContextcall needs lock protectionAdd the checkpoint lock pattern around each database count operation to coordinate with WAL checkpointing, matching the established pattern in sql_helpers.go.
♻️ Duplicate comments (6)
pkg/dotc1z/sql_helpers.go (1)
348-437: Unreachable code after retry loop; past review for rollback handling properly addressed.The rollback error handling from the past review has been correctly implemented using
errors.Join(lines 382-385, 393-397, 403-406). However, lines 436-437 are unreachable: every iteration of the retry loop either returns (on success, non-retryable error, or final retry exhaustion) or continues (on retryable SQLITE_BUSY with remaining retries). The loop cannot complete naturally.Remove the unreachable error return:
c.releaseCheckpointLock() return nil } - - return fmt.Errorf("failed to execute chunk after %d retries", maxRetries) }pkg/sync/state.go (1)
62-64: Enum/string mappings LGTM; add JSON round‑trip tests for the new op.Mappings for
CollectEntitlementsAndGrantsTasksOpare correct. Please extend tests to cover marshal/unmarshal round‑trip for this op. IfListResourcesForEntitlementsOpintentionally lacks a mapping (pre‑existing), add a brief comment to document intent.Also applies to: 107-109, 126-127
pkg/sync/parallel_syncer.go (4)
71-71: Unreachable return in addTaskWithRetry.The final
return nilis unreachable. Return an explicit error for unexpected fallthrough.- } - } - - return nil // This should never be reached + } + } + // Unreachable: defensive error to aid diagnostics + return fmt.Errorf("unexpected: addTaskWithRetry exited loop without returning")
379-397: Do not “resize” bucket channels; risks deadlock/lost tasks.Replacing a chan while producers still hold the old reference can wedge or lose tasks. Return queue‑full and rely on caller backoff, or redesign with a mutex+cond queue.
func (q *taskQueue) AddTaskWithTimeout(ctx context.Context, t *task, timeout time.Duration) error { @@ - case <-time.After(timeout): - // Queue is full, try to expand it - return q.expandQueueAndRetry(bucket, t, timeout) + case <-time.After(timeout): + return errTaskQueueFull @@ } -// expandQueueAndRetry attempts to expand the queue and retry adding the task. -func (q *taskQueue) expandQueueAndRetry(bucket string, t *task, timeout time.Duration) error { - q.mu.Lock() - defer q.mu.Unlock() - ... - return errTaskQueueFull -} +// Removed: dynamic queue expansion (unsafe for concurrent producers).Also applies to: 399-463
483-537: GetTask holds write lock while probing/receiving; high contention and potential stalls.Use RLock to snapshot buckets, release lock before non‑blocking receives; take write lock only when mutating the map. Avoid reading from chans under the write lock.
117-169: Recursive immediate processing risks deep recursion/stack overflow.
processTaskImmediatelyrecursively processes sub‑tasks; deep trees can overflow. Switch to an explicit stack/queue with a max‑depth guard.-func (w *worker) processTaskImmediately(task *task) error { - ... - switch task.Action.Op { +func (w *worker) processTaskImmediately(root *task) error { + const maxDepth = 100 + stack := []*task{root} + for len(stack) > 0 { + if len(stack) > maxDepth { + return fmt.Errorf("immediate processing depth exceeded %d", maxDepth) + } + task := stack[len(stack)-1] + stack = stack[:len(stack)-1] + switch task.Action.Op { case CollectEntitlementsAndGrantsTasksOp: - tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action) + tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action) if err != nil { return err } - for _, subTask := range tasks { if err := w.processTaskImmediately(subTask); err != nil { ... } } - return nil + stack = append(stack, tasks...) + continue case SyncResourcesOp: - tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action) + tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action) if err != nil { return err } - for _, subTask := range tasks { if err := w.processTaskImmediately(subTask); err != nil { ... } } - return nil + stack = append(stack, tasks...) + continue ... - } + } + } + return nil }
🧹 Nitpick comments (4)
pkg/tasks/c1api/manager.go (1)
57-58: Plumbing looks correct; consider Options to avoid constructor bloat.
- Field/threading LGTM;
parallelSyncpassed only to FullSync handler as intended.- The ctor now has 10+ positional params. Prefer functional options for maintainability and binary‑compat upgrades.
Also applies to: 260-261, 310-320, 334-335
pkg/sync/parallel_syncer.go (1)
548-564: Avoid receiving from bucket under write lock in GetTaskFromBucket.Same locking concern as
GetTask. Snapshot the channel under RLock, unlock, then perform the non‑blocking receive.pkg/tasks/local/syncer.go (1)
68-72: Local parallel wrapper wiring LGTM; avoid hard‑coding worker count.The builder use is correct now. Consider exposing worker count as an option rather than a constant 10 to let callers tune concurrency.
Also applies to: 101-121, 116-121
pkg/connectorrunner/runner.go (1)
559-564: Option to enable parallel sync: LGTM.Simple flag is fine. You may also consider a
WithParallelWorkers(n)option to surface worker count tuning from CLI.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
pb/c1/connector/v2/resource.pb.gois excluded by!**/*.pb.gopb/c1/connector/v2/resource_protoopaque.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (16)
pb/c1/connector/v2/resource.pb.validate.go(1 hunks)pkg/cli/commands.go(1 hunks)pkg/connectorrunner/runner.go(5 hunks)pkg/dotc1z/c1file.go(10 hunks)pkg/dotc1z/manager/local/local.go(3 hunks)pkg/dotc1z/manager/manager.go(4 hunks)pkg/dotc1z/manager/s3/s3.go(3 hunks)pkg/dotc1z/sql_helpers.go(4 hunks)pkg/field/defaults.go(2 hunks)pkg/sync/parallel_syncer.go(1 hunks)pkg/sync/state.go(3 hunks)pkg/sync/syncer.go(53 hunks)pkg/tasks/c1api/full_sync.go(4 hunks)pkg/tasks/c1api/manager.go(4 hunks)pkg/tasks/local/syncer.go(4 hunks)proto/c1/connector/v2/resource.proto(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
- pkg/tasks/c1api/full_sync.go
- pkg/cli/commands.go
- pb/c1/connector/v2/resource.pb.validate.go
- pkg/dotc1z/manager/s3/s3.go
- pkg/dotc1z/manager/local/local.go
- pkg/field/defaults.go
- proto/c1/connector/v2/resource.proto
- pkg/dotc1z/manager/manager.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-21T23:58:13.698Z
Learnt from: jirwin
PR: ConductorOne/baton-sdk#524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.698Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.
Applied to files:
pkg/sync/parallel_syncer.gopkg/sync/syncer.go
🧬 Code graph analysis (6)
pkg/tasks/local/syncer.go (3)
pkg/connectorrunner/runner.go (2)
WithParallelSyncEnabled(559-564)Option(254-254)pkg/sync/syncer.go (2)
Syncer(53-56)NewSyncer(3063-3089)pkg/sync/parallel_syncer.go (2)
DefaultParallelSyncConfig(250-255)NewParallelSyncer(833-845)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
WithParallelSyncEnabled(68-72)Option(30-30)pkg/tasks/c1api/manager.go (1)
NewC1TaskManager(309-336)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
C1File(39-63)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
Syncer(53-56)SequentialSyncer(314-342)pkg/dotc1z/manager/manager.go (1)
New(50-83)pkg/sync/state.go (7)
Action(130-137)CollectEntitlementsAndGrantsTasksOp(126-126)SyncEntitlementsOp(119-119)SyncGrantsOp(121-121)SyncResourcesOp(118-118)SyncGrantExpansionOp(124-124)SyncExternalResourcesOp(122-122)pb/c1/connector/v2/resource.pb.go (4)
ResourceType(126-141)ResourceType(154-154)Resource(2737-2749)Resource(2762-2762)pkg/annotations/annotations.go (1)
Annotations(12-12)
pkg/sync/syncer.go (1)
pkg/dotc1z/manager/manager.go (4)
ManagerOption(26-26)WithTmpDir(28-32)WithWALCheckpoint(40-44)New(50-83)
🪛 GitHub Check: go-lint
pkg/dotc1z/c1file.go
[failure] 468-468:
(*database/sql.DB).QueryRow must not be called. use (*database/sql.DB).QueryRowContext (noctx)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (14)
pkg/dotc1z/sql_helpers.go (2)
314-316: LGTM: Simple but effective SQLite busy detection.The string-matching approach is appropriate for SQLite error detection in Go. While slightly brittle, it covers the common error messages for database lock contention.
229-231: LGTM: Checkpoint lock properly coordinates read operations.The checkpoint lock is correctly acquired before query execution and released via defer, ensuring the entire query and row iteration are protected from concurrent WAL checkpoints.
pkg/dotc1z/c1file.go (8)
81-85: LGTM: Clean option function for WAL checkpoint control.
154-158: LGTM: Option correctly propagates through C1Z wrapper.
179-181: LGTM: Proper propagation of WAL checkpoint option to C1File.
206-213: LGTM: Checkpoint lifecycle properly coordinated with sync.Once and channel signaling.The shutdown sequence correctly stops the ticker, closes the stop channel exactly once, and waits for goroutine termination.
264-267: LGTM: Checkpoint initialization properly gated.Starting checkpointing only when both enabled and in WAL mode is the correct guard.
475-490: LGTM: Background checkpoint loop is cleanly structured.The goroutine properly closes
checkpointDoneon exit, and the select handles both periodic ticks and shutdown signaling.
492-504: LGTM: Read lock helpers correctly guard DB operations.These helpers allow normal DB operations (readers) to proceed concurrently but block when
performWALCheckpointacquires the write lock.
506-524: LGTM: Checkpoint logic is sound with proper fallback strategy.The write lock correctly pauses all DB activity during checkpointing. The TRUNCATE → RESTART fallback and structured error logging are appropriate.
pkg/sync/parallel_syncer.go (1)
838-841: WAL checkpointing always-on for parallel: acknowledged.Unconditional
enableWALCheckpoint = truematches the stated design requirement for parallel sync. Consider a clarifying comment that this is mandatory, not configurable.pkg/sync/syncer.go (2)
66-68: Thread‑safe ProgressCounts: good separation; ensure mode set before concurrency.Mutex‑guarded helpers and
sequentialModegate look solid. ConfirmSetSequentialMode(false)is called before starting workers (it is inparallelSyncer.initializeSync). No further action.Also applies to: 81-82, 280-312
2879-2886: WAL checkpoint plumbing LGTM.
loadStorehonorsenableWALCheckpointvia manager options. Matches the parallel wrapper behavior.Also applies to: 2897-2903
pkg/connectorrunner/runner.go (1)
806-818: End‑to‑end flag propagation verified.
parallelSyncis threaded to both local and C1 task managers. Looks consistent.Also applies to: 793-795
| // StateInterface defines the minimal interface needed by helper methods. | ||
| // This allows helper methods to work with either the sequential syncer's state machine | ||
| // or the parallel syncer's local state context. | ||
| type StateInterface interface { | ||
| PageToken(ctx context.Context) string | ||
| NextPage(ctx context.Context, pageToken string) error | ||
| SetNeedsExpansion() | ||
| SetHasExternalResourcesGrants() | ||
| ShouldFetchRelatedResources() bool | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
LocalStateContext cannot ever set ShouldFetchRelatedResources; parity gap vs sequential.
StateInterface lacks SetShouldFetchRelatedResources(), and LocalStateContext’s shouldFetchRelated is never set. Parallel paths will never fetch related resources even when sequential would (e.g., targeted syncs).
Suggested fix: add a setter to StateInterface and LocalStateContext, and thread the flag from the initiating context (e.g., when targeted sync is enabled).
type StateInterface interface {
PageToken(ctx context.Context) string
NextPage(ctx context.Context, pageToken string) error
SetNeedsExpansion()
SetHasExternalResourcesGrants()
ShouldFetchRelatedResources() bool
+ SetShouldFetchRelatedResources()
}
@@
type LocalStateContext struct {
resourceID *v2.ResourceId
pageToken string
needsExpansion bool
hasExternalResources bool
shouldFetchRelated bool
}
@@
func (lsc *LocalStateContext) ShouldFetchRelatedResources() bool {
return lsc.shouldFetchRelated
}
+
+func (lsc *LocalStateContext) SetShouldFetchRelatedResources() {
+ lsc.shouldFetchRelated = true
+}Please confirm intended behavior for partial/targeted parallel syncs.
Also applies to: 193-201
I'll verify this review comment by examining the codebase to confirm whether the claimed parity gap actually exists.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find StateInterface definition and examine its methods
rg -n "type StateInterface interface" -A 10 pkg/sync/parallel_syncer.go
Length of output: 420
🏁 Script executed:
#!/bin/bash
# Find LocalStateContext definition
rg -n "type LocalStateContext struct" -A 10 pkg/sync/parallel_syncer.goLength of output: 401
🏁 Script executed:
#!/bin/bash
# Search for all references to shouldFetchRelated in the parallel syncer
rg -n "shouldFetchRelated" pkg/sync/parallel_syncer.go -B 2 -A 2Length of output: 1025
🏁 Script executed:
#!/bin/bash
# Search for SetShouldFetchRelatedResources method calls or definitions
rg -n "SetShouldFetchRelatedResources" pkg/sync/Length of output: 196
🏁 Script executed:
#!/bin/bash
# Find sequential syncer and check how it handles ShouldFetchRelatedResources
rg -n "ShouldFetchRelatedResources" pkg/sync/ -B 3 -A 3Length of output: 4752
I need to verify whether the parallel syncer has targeted sync support and how LocalStateContext is instantiated:
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for targeted sync references in parallel syncer
rg -n "targeted\|partial" pkg/sync/parallel_syncer.go -i
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Find where LocalStateContext is instantiated/created in parallel syncer
rg -n "NewLocalStateContext\|&LocalStateContext" pkg/sync/parallel_syncer.go -B 3 -A 3Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check the main parallel sync entry point for any state initialization
rg -n "func.*Sync.*Parallel\|func.*SyncResource" pkg/sync/parallel_syncer.go | head -20Length of output: 0
🏁 Script executed:
#!/bin/bash
# Look for where LocalStateContext might receive shouldFetchRelated value
rg -n "func.*LocalState" pkg/sync/parallel_syncer.go -A 5Length of output: 1245
I need to determine if the parallel syncer is even invoked during targeted/partial syncs:
---
🏁 Script executed:
```shell
#!/bin/bash
# Search for where parallel syncer helper functions are called
rg -n "ProcessGrantsHelper\|processGrantsHelper" pkg/sync/ -B 5 -A 2
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check sequential syncer to see if it conditionally uses parallel syncer based on sync type
rg -n "parallel\|Parallel" pkg/sync/syncer.go -i -B 2 -A 2 | head -40Length of output: 0
🏁 Script executed:
#!/bin/bash
# Look for sync decision logic: when is parallel syncer used vs sequential
rg -n "func.*Sync\|shouldUseParallel\|useParallel" pkg/sync/syncer.go -i | head -20Length of output: 0
🏁 Script executed:
#!/antml:bin/bash
# Check if there are any comments or docs about parallel syncer limitations
rg -n "LocalStateContext\|parallel.*target\|partial.*sync" pkg/sync/parallel_syncer.go -B 2 -A 2 -iLength of output: 0
---
🏁 Script executed:
```shell
#!/bin/bash
# Find all functions/methods that accept StateInterface parameter
rg -n "StateInterface" pkg/sync/parallel_syncer.go
Length of output: 509
🏁 Script executed:
#!/bin/bash
# List all exported functions in parallel_syncer (start with capital letter)
rg -n "^func \([A-Z]|^func \w+\(" pkg/sync/parallel_syncer.go | head -30Length of output: 673
🏁 Script executed:
#!/bin/bash
# Look for helper method that uses StateInterface
ast-grep --pattern 'func $FNAME($$$state StateInterface$$$) {
$$$
}'Length of output: 0
🏁 Script executed:
#!/bin/bash
# Search what functions call ShouldFetchRelatedResources in parallel_syncer
rg -n "ShouldFetchRelatedResources\|shouldFetchRelated" pkg/sync/parallel_syncer.go -B 5 -A 5Length of output: 0
SetShouldFetchRelatedResources() missing in parallel syncer—LocalStateContext cannot set the flag.
The review comment is correct. I've verified:
-
StateInterface (lines 174–180) lacks
SetShouldFetchRelatedResources()method while the sequential syncer'sStateinterface (instate.go) includes it. -
LocalStateContext (lines 195–210) has the
shouldFetchRelatedfield but only a getter—no setter method exists. The field is always initialized tofalse. -
Parity gap confirmed: The sequential syncer calls
SetShouldFetchRelatedResources()at line 599 (syncer.go), enabling related resource fetching in targeted sync flows. The parallel syncer helper methodssyncGrantsForResourceLogicandsyncEntitlementsForResourceLogicread this flag (line 1771) but have no mechanism to set it, so it remains permanentlyfalse. -
The flag is only read and propagated through the result; it is never written in the parallel syncer code path.
This prevents the parallel syncer from handling targeted/partial syncs that should fetch related resources. The suggested fix (adding SetShouldFetchRelatedResources() to both StateInterface and LocalStateContext) is appropriate and aligns with the existing pattern used for SetNeedsExpansion() and SetHasExternalResourcesGrants().
🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 171–181 and LocalStateContext
definition around lines 195–210, the StateInterface is missing
SetShouldFetchRelatedResources() and LocalStateContext lacks the setter for
shouldFetchRelated; add SetShouldFetchRelatedResources() to the StateInterface
and implement a corresponding method on LocalStateContext that sets the
shouldFetchRelated boolean (mirror the pattern used by SetNeedsExpansion and
SetHasExternalResourcesGrants), ensure the method has the same signature as the
sequential syncer's State (no args, no return), and run go vet/build to fix any
missing interface implementations elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
pkg/dotc1z/c1file.go (1)
274-348: Stats() and GrantStats() still lack checkpoint read locks despite past review.As comprehensively identified in the previous review on outdated commits, both
Stats()andGrantStats()perform databaseCountContext()operations without acquiring checkpoint read locks, defeating the WAL checkpoint coordination mechanism.Required fixes:
- Stats(): Wrap the three
CountContext()calls (lines 320-323, 331-333, 339-341) withacquireCheckpointLock()/releaseCheckpointLock()- GrantStats(): Wrap the
CountContext()call (lines 445-448) withacquireCheckpointLock()/releaseCheckpointLock()This is marked as a duplicate because the issue was thoroughly documented in a previous review comment (marked as addressed in commit e68e083), but the checkpoint locks are not present in the current code.
Also applies to: 402-456
🧹 Nitpick comments (1)
pkg/dotc1z/c1file.go (1)
459-473: Consider case-insensitive pragma name comparison for consistency.The pragma value comparison uses
strings.EqualFold()(case-insensitive), but the pragma name check on line 462 is case-sensitive. While this works correctly (the fallback query catches any case variations), making the pragma name check case-insensitive would be more consistent and avoid unnecessary DB queries when users provide different casing.func (c *C1File) isWALMode(ctx context.Context) bool { for _, pragma := range c.pragmas { - if pragma.name == "journal_mode" && strings.EqualFold(pragma.value, "wal") { + if strings.EqualFold(pragma.name, "journal_mode") && strings.EqualFold(pragma.value, "wal") { return true } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/dotc1z/c1file.go(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
🔇 Additional comments (7)
pkg/dotc1z/c1file.go (7)
10-10: LGTM: Import additions support new WAL checkpointing functionality.The
stringsimport is used for case-insensitive comparison inisWALMode(), and thectxzap/zapimports enable structured logging inperformWALCheckpoint().Also applies to: 15-16
56-62: LGTM: WAL checkpointing fields are well-structured.The field declarations support the checkpoint lifecycle (ticker, channels, sync.Once) and coordination (RWMutex). The comment on line 61 clearly explains the purpose of the mutex.
81-85: LGTM: WAL checkpoint options follow established patterns.The option functions correctly propagate the WAL checkpoint setting from
C1ZFilethrough toC1File, maintaining consistency with the existing option pattern in the codebase.Also applies to: 154-158, 179-181
107-108: LGTM: WAL checkpoint lifecycle management is sound.The lifecycle is properly managed:
- Channels initialized early in
NewC1File()Close()safely stops the ticker, usessync.Onceto prevent double-closing the stop channel, and waits for goroutine completioninit()defensively checks both the enabled flag and actual WAL mode before startingAlso applies to: 206-213, 264-267
475-490: LGTM: Background checkpoint goroutine is correctly implemented.The goroutine properly:
- Signals completion via
checkpointDonechannel- Responds to both ticker events and stop signals
- Uses a reasonable 5-minute checkpoint interval
492-504: LGTM: Checkpoint lock helpers are correctly implemented.The
acquireCheckpointLock()andreleaseCheckpointLock()helpers correctly acquire/release the RLock when checkpointing is enabled, allowing DB operations to coordinate with the checkpoint write lock.
506-524: LGTM: WAL checkpoint implementation is robust.The checkpoint logic is sound:
- Acquires exclusive write lock to ensure coordination with DB operations
- Uses appropriate 30-second timeout
- Implements proper fallback strategy (TRUNCATE → RESTART)
- Uses structured logging for errors
- Correctly doesn't fail the operation on checkpoint errors
ggreer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly small potatoes, but I'm worried about the change in transaction behavior in executeChunkedInsert().
| ) | ||
| } | ||
| p.LastGrantLog[resourceType] = time.Now() | ||
| if !p.sequentialMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it significantly hurt performance to always lock/unlock this mutex? Heck, since it's just logging, can we avoid the mutex entirely? Worst case we'll log an extra line here or there, right?
Also a nitpick: The conditionals could be reversed and the condition could be if p.sequentialMode { ....
|
|
||
| func WithC1FWALCheckpoint(enable bool) C1FOption { | ||
| return func(o *C1File) { | ||
| o.checkpointEnabled = enable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should turn on WAL mode. Or NewC1File should error if checkpointEnabled is true but WAL isn't. If checkpoint is enabled but WAL isn't, that's almost certainly a mistake, and probably the source of a future bug. ("Why isn't this checkpointing? I turned it on.")
| c1fopts = append(c1fopts, WithC1FPragma(pragma.name, pragma.value)) | ||
| } | ||
| if options.enableWALCheckpoint { | ||
| c1fopts = append(c1fopts, WithC1FWALCheckpoint(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the if statement and just have:
| c1fopts = append(c1fopts, WithC1FWALCheckpoint(true)) | |
| c1fopts = append(c1fopts, WithC1FWALCheckpoint(options.enableWALCheckpoint)) |
That way if the default behavior ever changes, it's possible to disable WAL checkpointing.
|
|
||
| // startWALCheckpointing starts a background goroutine to perform WAL checkpoints every 5 minutes. | ||
| func (c *C1File) startWALCheckpointing() { | ||
| c.checkpointTicker = time.NewTicker(5 * time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should WithWALCheckpointDuration(time.Duration) be the option instead of (or maybe in addition to) WithC1FWALCheckpoint(bool)? (If it's instead of, a duration of 0 would disable.) I'm guessing we'll want to tweak this duration based on what we learn from running it in prod.
|
|
||
| // performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE. | ||
| func (c *C1File) performWALCheckpoint() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate to add more configuration options, but this timeout also seems like a value we'd want to tweak based on what we learn from running parallel syncs. Maybe it's ok to add an environment variable to override the default value?
| opts = append(opts, dotc1z.WithDecoderOptions(l.decoderOptions...)) | ||
| } | ||
| if l.enableWALCheckpoint { | ||
| opts = append(opts, dotc1z.WithWALCheckpoint(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also have the if statement removed and just set it to true or false:
| opts = append(opts, dotc1z.WithWALCheckpoint(true)) | |
| opts = append(opts, dotc1z.WithWALCheckpoint(l.enableWALCheckpoint)) |
| chunkedRows := rows[start:end] | ||
|
|
||
| // Create the base insert dataset | ||
| err := executeChunkWithRetry(ctx, c, tableName, chunkedRows, buildQueryFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this code inserted all the chunks in one big transaction. This change inserts each chunk as a transaction. I'm pretty sure we don't want that.
| } | ||
|
|
||
| // executeChunkWithRetry executes a single chunk with retry logic for SQLITE_BUSY errors. | ||
| func executeChunkWithRetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need equivalent logic for the c1z backed session store?
| ) | ||
|
|
||
| // min returns the smaller of two integers. | ||
| func minInt(a, b int) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a builtin for this now.
Fix deadlock Play around with concurrent resource tasks A little log cleanup More log cleanup fix build Add task with retry and backoff default shouldFetchRelated to false Check context when waiting to add task Fix syntax error fix logging Fix deadlocks Don't exit the sync early if we aren't making enough progress. Call expand grants until it is complete Wire up parallel sync config option to service mode sync tasks. fix issue where only 10,000 entitlements are synced add CollectEntitlementsAndGrantsTasksOp set default worker count to 2 apply lock when updating entitlements progress during parallel sync use exponential backoff in addTaskWithRetry refactor so that AddTask* has minimal locking fix entitlement and grant progress counts we are tracking the number of resources processed, not the total number of entitlements and grants lock when expanding queues Use a transaction per chunk during bulk object put, also retry sqlite busy/dblocked errors. WAL checkpoint on 5 minute interval pause sqlite activity during checkpoint manual WAL checkpoints only apply to parallel sync update comment
update type for WithSessionStore Don't check the bucket name when looking to see if tasks are complete. Check context when sleeping. Check context when sleeping. Lint fixes More lint fixes. Various review feedback fixes.
bc22584 to
3399982
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/tasks/c1api/full_sync.go (1)
41-90: skipFullSync is ignored when parallelSync is enabled.
skipFullSyncis only applied as aWithSkipFullSyncoption to the underlyingSequentialSyncer, whoseSyncmethod short‑circuits toSkipSync. WhenparallelSyncis true, you build aSequentialSyncerwithskipFullSyncset, then wrap it in aparallelSyncerand callparallelSyncer.Sync, which does not consultskipFullSyncat all—so a “skip full sync” task will still run a full parallel sync.If this combination is valid, you likely want to either:
- Short‑circuit in this handler when
skipFullSync && parallelSync(runbaseSyncer.SkipSync(ctx)directly), or- Teach
parallelSyncer.Syncto honor the underlyingskipFullSyncflag.As written, behavior diverges from the sequential path for the same task flags.
Also applies to: 91-104
♻️ Duplicate comments (4)
pkg/sync/parallel_syncer.go (4)
36-43: Unreachable return in addTaskWithRetry; tighten control flow.The
forloop inaddTaskWithRetryalways returns from inside the loop, so the finalreturn nil // This should never be reachedis indeed unreachable and misleading. Consider replacing it with an explicit error return if the loop ever exits unexpectedly.func (ps *parallelSyncer) addTaskWithRetry(ctx context.Context, task *task, maxRetries int) error { @@ - for attempt := 0; attempt <= maxRetries; attempt++ { + for attempt := 0; attempt <= maxRetries; attempt++ { err := ps.taskQueue.AddTask(ctx, task) @@ - } - } - - return nil // This should never be reached + } + } + + // Defensive: the loop should always return above. + return fmt.Errorf("addTaskWithRetry exited retry loop without enqueuing task") }Also applies to: 44-72
117-169: Recursive immediate processing risks stack overflow on deep task trees.
processTaskImmediatelyrecursively calls itself for bothCollectEntitlementsAndGrantsTasksOpandSyncResourcesOpsub-tasks. On deeply nested resource hierarchies or long chains of follow‑on work, this fallback path can exhaust the goroutine stack—precisely when the queue is already under pressure.Refactor this to use an explicit task stack/queue (iterative loop) and/or enforce a maximum depth, so immediate processing cannot recurse unboundedly.
379-397: Dynamic channel “resizing” in expandQueueAndRetry is unsafe and can strand tasks.
expandQueueAndRetryreplaces the bucket channel inbucketQueueswhile other goroutines may still hold references to the old channel. Producers that already capturedcurrentQueue(from priorgetOrCreateBucketChannelcalls) can continue sending to the old channel, but no consumer ever reads from it after the map is updated—leading to stuck sends and lost tasks. The len‑based drain loop also has a race and can exit while producers are still writing into the old queue.Safer pattern here is to avoid resizing buffered channels entirely and simply treat a full queue as
errTaskQueueFull, lettingaddTaskWithGuaranteefall back to longer timeouts or immediate processing.func (q *taskQueue) AddTaskWithTimeout(ctx context.Context, t *task, timeout time.Duration) error { @@ // Try to add the task select { case queue <- t: return nil case <-time.After(timeout): - // Queue is full, try to expand it - return q.expandQueueAndRetry(bucket, t, timeout) + // Queue is full; let caller decide how to handle backpressure. + return errTaskQueueFull case <-ctx.Done(): return ctx.Err() } } - -// expandQueueAndRetry attempts to expand the queue and retry adding the task. -func (q *taskQueue) expandQueueAndRetry(bucket string, t *task, timeout time.Duration) error { - // ... remove entire function ... -}Then delete
expandQueueAndRetryentirely and adjust any remaining references accordingly.Also applies to: 399-463
171-181: LocalStateContext can never enable ShouldFetchRelatedResources.
StateInterfaceexposesShouldFetchRelatedResources()but no setter, andLocalStateContextonly has ashouldFetchRelatedfield plus a getter. In the parallel path, all call sites (e.g.,syncGrantsForResourceLogic) read this flag from a freshly createdLocalStateContextthat is never mutated, soshouldFetchRelatedis always false and the “fetch related resources” logic is effectively disabled.If parallel sync is expected to support targeted/partial syncs that should fetch related resources (as the sequential syncer already does), you’ll need to mirror the sequential state API:
- Add
SetShouldFetchRelatedResources()toStateInterface.- Implement it on
LocalStateContext(setshouldFetchRelated = true).- Ensure the initiating code sets this flag when running targeted/partial parallel syncs.
Also applies to: 193-201, 235-237, 1719-1870
🧹 Nitpick comments (3)
pkg/field/defaults.go (1)
91-98: Parallel sync field wiring looks correct; consider minor doc/order tweaksThe new
parallelSyncBoolField and its inclusion inDefaultFieldslook consistent with the existing pattern (persistent,ExportTargetNone, defaulting to false). No functional issues here.Two small, optional nits you may want to consider:
- The description
"This must be set to enable parallel sync"could be slightly more explicit, e.g. “Enable parallel synchronization of resources, entitlements, and grants,” to better match the feature semantics.- In
DefaultFields,parallelSyncis listed far from the other sync knobs (skipFullSync,sync-resources,sync-resource-types, etc.). If the order impacts CLI help readability, you might want to group it with those fields instead of nearServerSessionStoreMaximumSizeField.The current behavior is fine as-is; these are just UX/clarity improvements. Based on learnings, WAL checkpointing is still enforced internally by the parallel syncer, so it’s good that this flag does not try to expose or control that separately.
Also applies to: 257-307
pkg/sync/parallel_syncer.go (1)
822-830: Guard workers slice mutations if GetWorkerStatus is called concurrently.
parallelSyncerhas anmuRWMutex and usesRLockinareWorkersIdleandGetWorkerStatus, butstartWorkersandstopWorkersmutateps.workerswithout taking the write lock. IfGetWorkerStatus(orareWorkersIdle) is ever called from another goroutine while workers are being started or stopped, this will be a data race.Consider wrapping
ps.workersassignments and iteration instartWorkers/stopWorkerswithps.mu.Lock/Unlockto match the read‑side locking.Also applies to: 961-972, 1960-1973
pkg/tasks/local/syncer.go (1)
108-131: Parallel syncer wrapping and worker config look correct; worker count is a tunable policyThe refactor to introduce
baseSyncerplus asyncerinterface and then conditionally wrap with:config := sdkSync.DefaultParallelSyncConfig().WithWorkerCount(10) syncer = sdkSync.NewParallelSyncer(baseSyncer, config)is functionally sound:
- Error handling and
Closesemantics are preserved around the finalsyncer.- The builder call now correctly uses the returned config value.
- Sequential behavior remains unchanged when
m.parallelSyncis false.If you expect different connectors or environments to want different concurrency levels, consider threading the worker count from configuration/flags into this layer rather than hard‑coding
10; otherwise this is fine as a default. Based on learnings, WAL checkpointing is handled internally byNewParallelSyncer, so no extra knob is needed here.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
pb/c1/connector/v2/resource.pb.gois excluded by!**/*.pb.gopb/c1/connector/v2/resource_protoopaque.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (16)
pb/c1/connector/v2/resource.pb.validate.go(1 hunks)pkg/cli/commands.go(1 hunks)pkg/connectorrunner/runner.go(5 hunks)pkg/dotc1z/c1file.go(10 hunks)pkg/dotc1z/manager/local/local.go(3 hunks)pkg/dotc1z/manager/manager.go(4 hunks)pkg/dotc1z/manager/s3/s3.go(3 hunks)pkg/dotc1z/sql_helpers.go(4 hunks)pkg/field/defaults.go(2 hunks)pkg/sync/parallel_syncer.go(1 hunks)pkg/sync/state.go(3 hunks)pkg/sync/syncer.go(56 hunks)pkg/tasks/c1api/full_sync.go(4 hunks)pkg/tasks/c1api/manager.go(4 hunks)pkg/tasks/local/syncer.go(4 hunks)proto/c1/connector/v2/resource.proto(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- pb/c1/connector/v2/resource.pb.validate.go
- pkg/tasks/c1api/manager.go
- pkg/cli/commands.go
- proto/c1/connector/v2/resource.proto
- pkg/sync/state.go
- pkg/dotc1z/manager/s3/s3.go
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.
📚 Learning: 2025-10-21T23:58:13.768Z
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.
Applied to files:
pkg/tasks/c1api/full_sync.gopkg/tasks/local/syncer.gopkg/dotc1z/manager/local/local.gopkg/dotc1z/c1file.gopkg/field/defaults.gopkg/sync/parallel_syncer.gopkg/dotc1z/manager/manager.gopkg/connectorrunner/runner.gopkg/sync/syncer.go
📚 Learning: 2024-09-03T15:49:24.881Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:49:24.881Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `cleanup` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.
Applied to files:
pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-10-08T21:29:30.695Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-10-08T21:29:30.695Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `DBCache` methods in the `pkg/uhttp/dbcache.go` file.
Applied to files:
pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-09-03T15:53:47.572Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:53:47.572Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `NewDBCache` function of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.
Applied to files:
pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-09-03T15:51:48.712Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:51:48.712Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `Get` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.
Applied to files:
pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-10-08T21:29:30.695Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-10-08T21:29:30.695Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `Has` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.
Applied to files:
pkg/dotc1z/sql_helpers.go
🧬 Code graph analysis (8)
pkg/tasks/c1api/full_sync.go (3)
pkg/sync/syncer.go (2)
Syncer(54-57)NewSyncer(3297-3323)pkg/types/types.go (1)
ConnectorClient(30-46)pkg/sync/parallel_syncer.go (2)
DefaultParallelSyncConfig(250-255)NewParallelSyncer(833-845)
pkg/dotc1z/sql_helpers.go (2)
pkg/dotc1z/c1file.go (1)
C1File(39-63)vendor/github.com/doug-martin/goqu/v9/insert_dataset.go (2)
InsertDataset(12-18)Insert(33-35)
pkg/dotc1z/manager/local/local.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/c1file.go (1)
WithWALCheckpoint(154-158)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (2)
WithWALCheckpoint(44-48)Option(30-30)
pkg/dotc1z/c1file.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/manager.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/field/defaults.go (3)
pb/c1/config/v1/config.pb.go (2)
BoolField(1048-1054)BoolField(1067-1067)pkg/field/fields.go (1)
BoolField(179-199)pkg/field/field_options.go (4)
WithDescription(47-53)WithPersistent(129-135)WithExportTarget(103-111)ExportTargetNone(97-97)
pkg/sync/parallel_syncer.go (3)
pkg/sync/state.go (1)
Action(144-151)pb/c1/connector/v2/resource_protoopaque.pb.go (10)
ResourceId(2626-2633)ResourceId(2646-2646)ResourceType(126-137)ResourceType(150-150)ResourceTypesServiceListResourceTypesRequest(273-282)ResourceTypesServiceListResourceTypesRequest(295-295)ResourcesServiceListResourcesRequest(2901-2911)ResourcesServiceListResourcesRequest(2924-2924)Resource(2711-2723)Resource(2736-2736)pb/c1/connector/v2/resource.pb.go (10)
ResourceId(2652-2659)ResourceId(2672-2672)ResourceType(126-141)ResourceType(154-154)ResourceTypesServiceListResourceTypesRequest(275-284)ResourceTypesServiceListResourceTypesRequest(297-297)ResourcesServiceListResourcesRequest(2925-2935)ResourcesServiceListResourcesRequest(2948-2948)Resource(2737-2749)Resource(2762-2762)
pkg/dotc1z/manager/manager.go (4)
pkg/dotc1z/decoder.go (1)
DecoderOption(57-57)pkg/dotc1z/c1file.go (1)
WithWALCheckpoint(154-158)pkg/dotc1z/manager/local/local.go (1)
WithWALCheckpoint(40-44)pkg/dotc1z/manager/s3/s3.go (1)
WithWALCheckpoint(44-48)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
WithParallelSyncEnabled(75-79)Option(31-31)pkg/tasks/c1api/manager.go (1)
NewC1TaskManager(303-330)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (11)
pkg/dotc1z/manager/local/local.go (1)
18-24: Local WAL checkpoint option wiring looks correct.
enableWALCheckpointis plumbed viaWithWALCheckpointand conditionally passed asdotc1z.WithWALCheckpoint(true)alongsidejournal_mode=WAL, which is consistent with the new C1Z/C1File options and doesn’t introduce correctness issues.Also applies to: 40-44, 115-125
pkg/dotc1z/manager/manager.go (1)
20-24: Manager-level WAL checkpoint flag is plumbed cleanly.
WithWALCheckpointonmanagerOptionsand the conditional propagation to S3 and local options are straightforward and backward compatible; they align with the new C1Z/C1File WAL options.Also applies to: 40-44, 57-82
pkg/dotc1z/sql_helpers.go (1)
229-232: Checkpoint locking and SQLite BUSY retry logic look correct.
listConnectorObjectsandexecuteChunkWithRetrynow acquire the checkpoint read lock around DB activity, which correctly coordinates with the WAL checkpoint writer lock. The per‑chunk transaction flow rolls back and releases the lock on all error paths, retries only onSQLITE_BUSY/“database is locked”, and uses upsert-style inserts so replays on retry are safe.Also applies to: 314-317, 318-346, 348-437
pkg/dotc1z/c1file.go (1)
39-63: WAL checkpointing design and lifecycle are sound.The new
checkpointEnabledflag, options (WithC1FWALCheckpoint,WithWALCheckpoint), andinit/Closewiring form a coherent WAL checkpointing story: checkpoints only start when enabled and the DB is truly in WAL mode, DB operations useacquireCheckpointLock/releaseCheckpointLockas readers, andperformWALCheckpointtakes the write lock with bounded timeout and proper logging.Closecleanly stops the ticker, signals the goroutine, waits forcheckpointDone, and only then closes the DB, which avoids background work on a closed handle.Also applies to: 81-86, 99-109, 128-133, 154-159, 160-191, 201-213, 235-270, 459-524
pkg/sync/parallel_syncer.go (1)
832-845: WAL checkpoint unconditionally enabled for parallel syncer (matches design requirement).
NewParallelSyncerforcesbaseSyncer.enableWALCheckpoint = truebefore constructing the parallel syncer, ensuring WAL checkpointing is always on under high concurrency. This matches the documented design requirement for parallel sync and keeps the configuration non‑optional in this mode.Based on learnings, parallel sync is required to enable WAL checkpointing to avoid checkpoint failures under heavy concurrency.
pkg/sync/syncer.go (2)
59-83: ProgressCounts locking and sequential/parallel split look correct.Introducing
muandsequentialMode, plus theAdd*helpers, cleanly separates the single‑threaded sequential case (no locking) from the parallel case (mutex‑protected). Logging methods correctly branch onsequentialModeand only take locks when needed; the direct map increments in the sequential paths (SyncResourceTypes,SyncResources,SyncEntitlementsForResource,SyncGrantsForResource) are guarded bysequentialModechecks or only used whensequentialModeis true, so they’re safe.Also applies to: 86-178, 181-255, 257-277, 279-312, 1399-1410, 2145-2147
314-344: SequentialSyncer’s WAL checkpoint flag is wired into the manager as intended.Adding
enableWALCheckpointtoSequentialSyncerand using it inloadStoreto appendmanager.WithWALCheckpoint(true)ensures that when parallel sync sets this flag, the C1Z manager (and ultimately C1File) is created with WAL checkpointing enabled. Sequential callers that never touch the flag retain the previous behavior.Based on learnings, this is the right place to plumb the “always enable WAL checkpointing for parallel sync” requirement down into the storage layer.
Also applies to: 3099-3131
pkg/tasks/c1api/full_sync.go (1)
30-39: Parallel sync wiring from task handler config into Syncer is clear.Propagating
parallelSyncintofullSyncTaskHandler, constructing a baseSequentialSyncerwith the existing options, and then conditionally wrapping it in aparallelSyncer(with a reasonable default worker count) is a clean way to switch between sequential and parallel paths without changing the task surface.Also applies to: 198-218
pkg/tasks/local/syncer.go (1)
18-29: Parallel sync flag and local option are wired coherentlyAdding
parallelSynctolocalSyncerand exposing it viaWithParallelSyncEnabled(parallel bool)matches the existing option pattern (other booleans on the struct) and keeps the default behavior sequential when the option isn’t used. I don’t see correctness risks here.Also applies to: 75-79
pkg/connectorrunner/runner.go (2)
318-352: runnerConfig.parallelSync and WithParallelSyncEnabled integrate cleanlyThe new
parallelSyncfield onrunnerConfigplus the parameterlessWithParallelSyncEnabled()option fit well with the existing option style (boolean feature toggles). Defaults stay sequential, and enabling parallel sync is explicit and opt‑in. No issues here.Also applies to: 556-561
806-815: End-to-end parallelSync plumbing into local and C1 task managers is consistentThe additions:
- Passing
local.WithParallelSyncEnabled(cfg.parallelSync)intolocal.NewSyncerfor on-demand sync, and- Extending
c1api.NewC1TaskManager(...)to accept and receivecfg.parallelSyncensure that the same flag controls parallel behavior in both local and C1-managed sync paths without changing behavior when the flag is unset. This matches the PR’s intent and aligns with the local syncer’s new option. Based on learnings, this also correctly routes calls through
NewParallelSyncerwhere WAL checkpointing is enforced when parallel mode is enabled.Also applies to: 827-838
Introduces an option to sync resources, entitlements and grants in parallel. This is a second attempt, building on-top of #456. We've successfully synced large connects(>1 million users, >1 million groups, ~2.7M grants). For this large sync, we saw a full sync go from taking longer than 20 hours down to 5.5 hours.
By default, each resource type will be synced in parallel. This means that user and group resources can be listed and synced in parallel. Once the resources are synced, entitlements and grants for each resource will begin syncing in parallel.
Summary by CodeRabbit
New Features
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.