Skip to content

Conversation

@jirwin
Copy link
Contributor

@jirwin jirwin commented Oct 20, 2025

Introduces an option to sync resources, entitlements and grants in parallel. This is a second attempt, building on-top of #456. We've successfully synced large connects(>1 million users, >1 million groups, ~2.7M grants). For this large sync, we saw a full sync go from taking longer than 20 hours down to 5.5 hours.

By default, each resource type will be synced in parallel. This means that user and group resources can be listed and synced in parallel. Once the resources are synced, entitlements and grants for each resource will begin syncing in parallel.

Summary by CodeRabbit

  • New Features

    • Parallel synchronization mode via a new "parallel-sync" flag with configurable workers and default bucket.
    • Resource-type bucketing to enforce per-bucket sequencing during parallel runs.
    • Optional WAL checkpointing (opt-in) to improve DB stability during syncs.
    • Enhanced telemetry and runtime visibility for parallel sync operations.
  • Refactor

    • Sync orchestration and progress tracking reworked to support both parallel and sequential modes.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Oct 20, 2025

Walkthrough

Adds a parallel sync path with bucketed workers and a new sync_bucket proto field; refactors internal syncer into exported SequentialSyncer; adds WAL checkpointing with coordinated locks/retries for SQLite; and threads a parallel-sync flag through CLI, runner, task-manager, and local syncer wiring.

Changes

Cohort / File(s) Summary
Proto & validator
\proto/c1/connector/v2/resource.proto`, `pb/c1/connector/v2/resource.pb.validate.go``
Adds string sync_bucket = 7 to ResourceType; validator file updated with a comment noting no validation rules for SyncBucket (no functional validation added).
CLI / runner / flags
\pkg/cli/commands.go`, `pkg/connectorrunner/runner.go`, `pkg/field/defaults.go``
Adds persisted boolean parallel-sync flag handling in CLI; introduces WithParallelSyncEnabled() Option and threads parallelSync through runnerConfig and runner option construction.
Task manager & handlers
\pkg/tasks/c1api/manager.go`, `pkg/tasks/c1api/full_sync.go`, `pkg/tasks/local/syncer.go``
Adds parallelSync field/parameter to C1 task manager and handlers; full-sync and local task runner conditionally wrap the base syncer with a parallel syncer when enabled.
Parallel sync core
\pkg/sync/parallel_syncer.go`**
New parallel syncer implementation: bucketed task queues, worker pool, orchestration, public config/API (ParallelSyncConfig, constructors, stats/status, Close), state abstractions, and task lifecycle/expansion logic.
Sequential sync refactor
\pkg/sync/syncer.go`**
Internal syncer exported as SequentialSyncer; ProgressCounts gains mutex/sequentialMode and thread-safe helpers; many receiver signatures updated; WAL checkpoint toggle integrated.
Sync state enum
\pkg/sync/state.go`**
Adds CollectEntitlementsAndGrantsTasksOp to ActionOp enum with string and parser support.
WAL checkpointing (store & manager)
\pkg/dotc1z/c1file.go`, `pkg/dotc1z/manager/manager.go`, `pkg/dotc1z/manager/local/local.go`, `pkg/dotc1z/manager/s3/s3.go``
Adds WithC1FWALCheckpoint / WithWALCheckpoint options; detects WAL mode, adds checkpoint goroutine (ticker), checkpoint lock lifecycle, PRAGMA wal_checkpoint(TRUNCATE)/RESTART fallback, and propagates option through managers.
SQL helpers & retries
\pkg/dotc1z/sql_helpers.go`**
Coordinates checkpoint lock around queries; extracts executeChunkWithRetry with retry/backoff for SQLITE_BUSY, integrates lock/tx lifecycle and improved error handling.
Field defaults
\pkg/field/defaults.go`**
Adds unexported persisted boolean field parallelSync to default fields list (affects default ordering).

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI
    participant Runner as ConnectorRunner
    participant TaskMgr as C1TaskManager
    participant FullSync as FullSyncHandler
    participant Base as SequentialSyncer
    participant Parallel as ParallelSyncer

    CLI->>Runner: run with --parallel-sync
    Runner->>Runner: append WithParallelSyncEnabled() to opts
    Runner->>TaskMgr: NewC1TaskManager(..., parallelSync=true)
    TaskMgr->>FullSync: newFullSyncTaskHandler(parallelSync=true)
    FullSync->>Base: NewSyncer(...)
    alt parallelSync == true
        FullSync->>Parallel: NewParallelSyncer(Base, config)
        Parallel->>Parallel: start workers & buckets
        FullSync->>Parallel: Sync(ctx)
        Parallel->>FullSync: Sync complete
        FullSync->>Parallel: Close(ctx)
    else
        FullSync->>Base: Sync(ctx)
        Base->>FullSync: Sync complete
        FullSync->>Base: Close(ctx)
    end
Loading
sequenceDiagram
    participant Init as NewC1File/C1Z
    participant C1File as C1File (store)
    participant DB as SQLite
    participant Ticker as Checkpoint Goroutine

    Init->>C1File: WithC1FWALCheckpoint(true)
    C1File->>DB: PRAGMA journal_mode?
    alt DB in WAL mode
        C1File->>Ticker: startWALCheckpointing()
        loop every interval
            Ticker->>C1File: acquireCheckpointLock()
            Ticker->>DB: PRAGMA wal_checkpoint(TRUNCATE)
            Ticker->>C1File: releaseCheckpointLock()
        end
    end
    C1File->>C1File: Close() -> stop ticker, wait for done
Loading
sequenceDiagram
    participant Caller as sql_helpers
    participant Lock as CheckpointLock
    participant DB as SQLite

    Caller->>Lock: acquireCheckpointLock()
    Caller->>DB: begin tx, execute chunk insert
    alt success
        DB->>Caller: commit
        Caller->>Lock: releaseCheckpointLock()
    else SQLITE_BUSY or transient
        Caller->>Caller: retry with backoff (new tx each attempt)
        loop until maxRetries
            Caller->>DB: begin tx, execute
        end
        Caller->>Lock: releaseCheckpointLock()
    else error
        DB->>Caller: rollback
        Caller->>Lock: releaseCheckpointLock()
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Files and areas needing extra attention:

  • pkg/sync/parallel_syncer.go — large, complex concurrency, queueing, retry, and tracing logic.
  • pkg/sync/syncer.go — broad API/receiver changes; ensure no callsites remain using old types/signatures.
  • WAL checkpoint interactions across pkg/dotc1z/* and pkg/dotc1z/sql_helpers.go — correctness of lock lifecycle, retry/backoff, and DB PRAGMA semantics.
  • Task manager wiring (pkg/tasks/c1api/*, pkg/tasks/local/syncer.go, pkg/connectorrunner/runner.go) — verify flag propagation and lifecycle (Sync/Close) for wrapped parallel syncer.

Possibly related PRs

Suggested reviewers

  • laurenleach
  • kans

Poem

🐰
Buckets hum and workers hop,
Checkpoints tap—the DB won't stop,
Tasks split lanes and race in cheer,
Sequential roots keep order near,
A rabbit nods: the sync is here! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.90% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Parallel Syncing' is concise and directly reflects the main feature being introduced across the entire changeset. It captures the core change without unnecessary detail.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jallers/jirwin/psync-deadlock-fix

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/dotc1z/c1file.go (1)

351-352: Typo in error message.

“datbase” → “database”.

-    return fmt.Errorf("c1file: datbase has not been opened")
+    return fmt.Errorf("c1file: database has not been opened")
🧹 Nitpick comments (9)
pkg/dotc1z/manager/s3/s3.go (1)

21-28: WAL checkpoint option propagation is correct.

Option plumbing and conditional C1Z option are consistent. Consider a small debug log when enabled to aid troubleshooting.

@@ func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) {
   if len(s.decoderOptions) > 0 {
     opts = append(opts, dotc1z.WithDecoderOptions(s.decoderOptions...))
   }
   if s.enableWALCheckpoint {
+    l.Debug("enabling WAL checkpointing for S3-managed C1Z", zap.String("file_path", s.fileName))
     opts = append(opts, dotc1z.WithWALCheckpoint(true))
   }

Also applies to: 44-48, 140-142

pkg/dotc1z/manager/local/local.go (1)

18-24: Local WAL checkpoint option wiring LGTM; optional observability.

The option and propagation are sound. Add a debug log when enabled to simplify field debugging.

@@ func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) {
   if len(l.decoderOptions) > 0 {
     opts = append(opts, dotc1z.WithDecoderOptions(l.decoderOptions...))
   }
   if l.enableWALCheckpoint {
+    log.Debug("enabling WAL checkpointing for local C1Z",
+      zap.String("file_path", l.filePath),
+      zap.String("temp_path", l.tmpPath),
+    )
     opts = append(opts, dotc1z.WithWALCheckpoint(true))
   }

Also applies to: 40-44, 122-124

pkg/dotc1z/sql_helpers.go (4)

313-316: SQLite BUSY detection via string match is brittle; prefer driver error codes.

If feasible with the chosen driver, detect BUSY using the driver’s error type/code (e.g., sqlite3.Error{Code: sqlite3.ErrBusy}) and fallback to string match.


317-346: Respect context cancellation between chunks.

Break out early if ctx is cancelled to avoid long waits under backpressure.

 for i := 0; i < chunks; i++ {
+  select {
+  case <-ctx.Done():
+    return ctx.Err()
+  default:
+  }
   start := i * chunkSize
   ...
 }

348-427: Improve retry backoff to exponential with jitter.

Linear backoff can synchronize retries and increase contention. Use exponential backoff with jitter, and cap the max delay.

- maxRetries := 5
- baseDelay := 10 * time.Millisecond
+ maxRetries := 5
+ baseDelay := 10 * time.Millisecond
+ maxDelay := 500 * time.Millisecond
@@
- case <-time.After(time.Duration(attempt+1) * baseDelay):
+ // exponential backoff with jitter
+ delay := baseDelay * (1 << attempt)
+ if delay > maxDelay {
+   delay = maxDelay
+ }
+ // add small jitter
+ jitter := time.Duration(int64(delay) / 5)
+ sleep := delay + time.Duration(rand.Int63n(int64(jitter+1)))
+ case <-time.After(sleep):
   continue

Note: add math/rand import (seeded elsewhere or acceptable default).

+ "math/rand"

95-273: Apply checkpoint lock consistently to read paths.

listConnectorObjects now guards reads; consider the same for getResourceObject/getConnectorObject to avoid WAL checkpoint races during heavy load.

pkg/sync/parallel_syncer.go (3)

354-367: Hot-path log level too noisy; downgrade to Debug.

Per-task Add logs at Info will flood logs.

-    l.Info("task added to queue",
+    l.Debug("task added to queue",

1289-1294: Duplicate branch bodies; simplify logging.

Both branches log progress identically. Collapse branches.

-  if len(resp.List) > 0 {
-    ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId)
-  } else {
-    // Even with no resources, we should log progress
-    ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId)
-  }
+  ps.syncer.counts.LogResourcesProgress(ctx, resourceTypeId)

(Apply similarly in collectTasks.)

Also applies to: 1435-1439


1719-1727: String literal repeated; make it a const.

Replace "next_page" with a package-level const to avoid drift.

+const actionNextPage = "next_page"
@@
-  for decision.ShouldContinue && decision.Action == "next_page" {
+  for decision.ShouldContinue && decision.Action == actionNextPage {
@@
-      Action:               "next_page",
+      Action:               actionNextPage,
@@
-      Action:         "next_page",
+      Action:         actionNextPage,

Also applies to: 1996-2004, 2053-2059

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 099b856 and 045b6f9.

⛔ Files ignored due to path filters (2)
  • pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go
  • pb/c1/connector/v2/resource_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (16)
  • pb/c1/connector/v2/resource.pb.validate.go (1 hunks)
  • pkg/cli/commands.go (1 hunks)
  • pkg/connectorrunner/runner.go (4 hunks)
  • pkg/dotc1z/c1file.go (9 hunks)
  • pkg/dotc1z/manager/local/local.go (3 hunks)
  • pkg/dotc1z/manager/manager.go (4 hunks)
  • pkg/dotc1z/manager/s3/s3.go (3 hunks)
  • pkg/dotc1z/sql_helpers.go (4 hunks)
  • pkg/field/defaults.go (2 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
  • pkg/sync/state.go (3 hunks)
  • pkg/sync/syncer.go (53 hunks)
  • pkg/tasks/c1api/full_sync.go (4 hunks)
  • pkg/tasks/c1api/manager.go (4 hunks)
  • pkg/tasks/local/syncer.go (4 hunks)
  • proto/c1/connector/v2/resource.proto (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
pkg/cli/commands.go (1)
pkg/connectorrunner/runner.go (1)
  • WithParallelSyncEnabled (558-563)
pkg/dotc1z/manager/manager.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/c1file.go (1)
  • WithWALCheckpoint (151-155)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/tasks/local/syncer.go (3)
pkg/connectorrunner/runner.go (2)
  • WithParallelSyncEnabled (558-563)
  • Option (254-254)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • NewSyncer (3003-3029)
pkg/sync/parallel_syncer.go (2)
  • DefaultParallelSyncConfig (244-249)
  • NewParallelSyncer (818-830)
pkg/dotc1z/manager/s3/s3.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/c1file.go (1)
  • WithWALCheckpoint (151-155)
pkg/dotc1z/manager/local/local.go (2)
  • WithWALCheckpoint (40-44)
  • Option (26-26)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
  • C1File (36-60)
pkg/tasks/c1api/full_sync.go (2)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • NewSyncer (3003-3029)
pkg/sync/parallel_syncer.go (2)
  • DefaultParallelSyncConfig (244-249)
  • NewParallelSyncer (818-830)
pkg/field/defaults.go (3)
pb/c1/config/v1/config.pb.go (2)
  • BoolField (1034-1040)
  • BoolField (1053-1053)
pkg/field/fields.go (1)
  • BoolField (179-199)
pkg/field/field_options.go (4)
  • WithDescription (47-53)
  • WithPersistent (129-135)
  • WithExportTarget (103-111)
  • ExportTargetNone (97-97)
pkg/dotc1z/manager/local/local.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/c1file.go (1)
  • WithWALCheckpoint (151-155)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (2)
  • WithWALCheckpoint (44-48)
  • Option (30-30)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
  • WithParallelSyncEnabled (61-65)
  • Option (29-29)
pkg/tasks/c1api/manager.go (1)
  • NewC1TaskManager (307-332)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/sync/syncer.go (3)
pkg/sync/expand/graph.go (1)
  • EntitlementGraphAction (14-20)
pkg/sync/state.go (1)
  • Action (130-137)
pkg/dotc1z/manager/manager.go (4)
  • ManagerOption (26-26)
  • WithTmpDir (28-32)
  • WithWALCheckpoint (40-44)
  • New (50-83)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • SequentialSyncer (314-341)
pkg/dotc1z/manager/manager.go (1)
  • New (50-83)
pkg/sync/state.go (7)
  • Action (130-137)
  • CollectEntitlementsAndGrantsTasksOp (126-126)
  • SyncEntitlementsOp (119-119)
  • SyncGrantsOp (121-121)
  • SyncResourcesOp (118-118)
  • SyncGrantExpansionOp (124-124)
  • SyncExternalResourcesOp (122-122)
pb/c1/connector/v2/resource.pb.go (6)
  • ResourceId (2652-2659)
  • ResourceId (2672-2672)
  • ResourceType (126-141)
  • ResourceType (154-154)
  • Resource (2737-2749)
  • Resource (2762-2762)
pkg/annotations/annotations.go (1)
  • Annotations (12-12)
🪛 GitHub Check: go-lint
pkg/dotc1z/sql_helpers.go

[failure] 396-396:
Error return value of tx.Rollback is not checked (errcheck)


[failure] 389-389:
Error return value of tx.Rollback is not checked (errcheck)


[failure] 381-381:
Error return value of tx.Rollback is not checked (errcheck)

pkg/dotc1z/c1file.go

[failure] 483-483:
Comment should end in a period (godot)


[failure] 466-466:
Comment should end in a period (godot)


[failure] 456-456:
Comment should end in a period (godot)


[failure] 514-514:
use of fmt.Printf forbidden by pattern ^(fmt\.Print(|f|ln)|print|println)$ (forbidigo)

pkg/sync/parallel_syncer.go

[failure] 1435-1435:
dupBranchBody: both branches in if statement have same body (gocritic)


[failure] 1289-1289:
dupBranchBody: both branches in if statement have same body (gocritic)


[failure] 1667-1667:
string next_page has 4 occurrences, make it a constant (goconst)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (18)
pkg/dotc1z/sql_helpers.go (1)

228-231: Checkpoint lock correctly implements RLock for normal DB operations and exclusive Lock for checkpoint.

Verified: acquireCheckpointLock() uses RLock() at pkg/dotc1z/c1file.go:486, releaseCheckpointLock() uses RUnlock() at pkg/dotc1z/c1file.go:493, and the checkpoint operation uses exclusive Lock() at pkg/dotc1z/c1file.go:503. This is the correct pattern—no contention issues.

pkg/field/defaults.go (1)

87-87: End-to-end wiring confirmed; changes approved.

The parallel-sync field is properly wired throughout the codebase:

  • CLI reads the flag and creates the appropriate option
  • Runner passes it to both local syncer and manager
  • Both paths use it to conditionally instantiate the parallel syncer
  • Field persistence and export settings are correct
pkg/dotc1z/manager/manager.go (1)

20-24: WAL checkpoint toggle properly propagated across all manager variants.

Verified that WithWALCheckpoint is correctly defined at line 40, applied to managerOptions struct (lines 20-24), and propagated to both S3Manager (lines 66-68) and LocalManager (lines 78-80) constructors in the New() function. Usage in syncer.go is consistent with the option pattern. No issues identified.

pb/c1/connector/v2/resource.pb.validate.go (1)

168-169: Generated file note.

Comment-only change; no issues. Ensure generator inputs (proto + protoc-gen-validate) are the source of truth.

pkg/sync/parallel_syncer.go (1)

1769-1774: No issues found with progress increment semantics.

Verification confirms the code correctly increments by 1 per resource processed (not per items). This is consistent throughout the codebase: both AddGrantsProgress and AddEntitlementsProgress calls at lines 1964 and 2049 use hardcoded count=1, matching the pattern in syncer.go. The logging functions track progress indexed by resourceType, confirming 1 unit = 1 resource processed, which is the intended sequential semantics.

pkg/cli/commands.go (1)

332-334: LGTM!

The parallel sync flag is properly integrated into the command options flow. The placement after the main switch and before resource validation checks is appropriate.

pkg/tasks/c1api/full_sync.go (1)

86-98: LGTM!

The conditional wrapping pattern correctly creates a base syncer and then wraps it with a parallel syncer when enabled. The worker count configuration (10 workers) is explicitly set, and error handling is properly maintained.

proto/c1/connector/v2/resource.proto (1)

47-52: LGTM!

The sync_bucket field is well-documented and appropriately optional, enabling gradual adoption of parallel sync bucketing. The field semantics are clear: same bucket = sequential, different buckets = parallel.

pkg/tasks/c1api/manager.go (2)

56-56: LGTM!

The parallelSync field is appropriately added to the manager struct and propagated through to the full sync task handler.


307-331: LGTM!

The constructor signature is correctly updated to accept the parallelSync parameter and properly initializes the field.

pkg/connectorrunner/runner.go (3)

558-563: LGTM!

The parallel sync option follows the established pattern and correctly propagates the flag to downstream components.


780-780: LGTM!

The parallel sync flag is correctly passed to the local syncer for on-demand sync paths.


793-802: LGTM!

The parallel sync flag is properly passed to the C1 task manager for daemon mode sync paths.

pkg/sync/syncer.go (5)

66-82: LGTM!

Thread-safety additions to ProgressCounts are well-designed. The sequentialMode flag (defaulting to true) maintains backward compatibility while enabling concurrent access when needed.


278-311: LGTM!

The thread-safe methods for ProgressCounts follow proper mutex locking patterns. These methods enable safe concurrent updates from parallel sync workers.


314-341: LGTM!

Renaming syncer to SequentialSyncer and making it public is appropriate for distinguishing it from the new parallel syncer. The enableWALCheckpoint field is properly added to support the parallel sync infrastructure.


2817-2849: LGTM!

The WAL checkpoint option is properly propagated to the manager when enableWALCheckpoint is true. This integration supports the parallel sync feature by preventing checkpoint failures under high concurrency.


2886-3029: LGTM!

All SyncOpt functions are consistently updated to operate on *SequentialSyncer, and the NewSyncer constructor correctly returns the new public type. The changes maintain backward compatibility in behavior while enabling the new parallel sync infrastructure.

Comment on lines +823 to +840
// Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency
baseSyncer.enableWALCheckpoint = true

Copy link

@coderabbitai coderabbitai bot Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

ParallelSyncer unconditionally enables WAL checkpointing on base syncer.

This overrides caller intent. Consider honoring existing setting or making it explicit in ParallelSyncConfig.

-  // Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency
-  baseSyncer.enableWALCheckpoint = true
+  // Enable WAL checkpointing if not explicitly set by caller.
+  if !baseSyncer.enableWALCheckpoint {
+    baseSyncer.enableWALCheckpoint = true
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency
baseSyncer.enableWALCheckpoint = true
// Enable WAL checkpointing if not explicitly set by caller.
if !baseSyncer.enableWALCheckpoint {
baseSyncer.enableWALCheckpoint = true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parallel sync, we always want wal checkpointing enabled.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jirwin, understood! Thank you for clarifying. Since parallel sync always requires WAL checkpointing due to the high concurrency, the unconditional enablement makes sense.

The existing comment does explain why, but if you'd like to make the requirement even more explicit, you could consider enhancing it slightly to:

// Enable WAL checkpointing for parallel sync - this is required (not optional) to prevent 
// checkpoint failures under high concurrency
baseSyncer.enableWALCheckpoint = true

Though the current comment is clear enough. I've noted this design requirement for future reference.


✏️ Learnings added
Learnt from: jirwin
PR: ConductorOne/baton-sdk#524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.698Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines 1098 to 1101
// Skip the default bucket (used for final tasks)
if bucketName == "resource-type-" {
continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Bucket name equality check is ineffective.

Comparing to "resource-type-" never matches actual default buckets ("resource-type-"). If you meant to skip default buckets, use HasPrefix.

-          if bucketName == "resource-type-" {
+          if strings.HasPrefix(bucketName, "resource-type-") {
             continue
           }

(Ensure strings is imported at top if not already.)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 1098 to 1101, the code compares
bucketName to the literal "resource-type-" which never matches default buckets
like "resource-type-<id>"; change the check to use strings.HasPrefix(bucketName,
"resource-type-") to skip those default buckets, and add/import the "strings"
package at the file top if it’s not already imported.

Comment on lines +62 to 71
case CollectEntitlementsAndGrantsTasksOp:
return "collect-entitlements-and-grants-tasks"
default:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

New operation wired correctly; add JSON round‑trip tests.

Add unit tests to verify MarshalJSON/UnmarshalJSON round‑trip for CollectEntitlementsAndGrantsTasksOp and token serialization.

Also confirm whether ListResourcesForEntitlementsOp intentionally lacks a String()/newActionOp mapping.

Also applies to: 107-109, 126-127


🏁 Script executed:

#!/bin/bash
# Quick grep to validate string mappings for all ActionOp variants
rg -nP 'type ActionOp|const \(|String\(\)|newActionOp\(' -n

Length of output: 268504









🏁 Script executed:

#!/bin/bash
# Locate and examine ActionOp enum, String(), and newActionOp in pkg/sync/state.go
fd -t f 'state\.go' pkg/sync/ | head -5

Length of output: 17


🏁 Script executed:

#!/bin/bash
# Display full pkg/sync/state.go to understand ActionOp structure
cat pkg/sync/state.go | head -150

Length of output: 4272


🏁 Script executed:

#!/bin/bash
# Search for String() method, newActionOp function, and ListResourcesForEntitlementsOp
rg -n 'func.*String\(\)|newActionOp|ListResourcesForEntitlementsOp' pkg/sync/state.go

Length of output: 237


🏁 Script executed:

#!/bin/bash
# Look for test files related to state.go
fd -t f 'state.*test\.go|test.*state' pkg/sync/

Length of output: 22



---

🏁 Script executed:

```shell
#!/bin/bash
# Examine state_test.go for marshal/unmarshal test coverage
cat pkg/sync/state_test.go

Length of output: 3242


🏁 Script executed:

#!/bin/bash
# Search for specific test coverage of ActionOp marshalling
rg -n 'MarshalJSON|UnmarshalJSON|CollectEntitlementsAndGrantsTasksOp|ListResourcesForEntitlementsOp' pkg/sync/state_test.go

Length of output: 0


Add JSON round-trip tests for the new CollectEntitlementsAndGrantsTasksOp operation.

The code change correctly implements String() and newActionOp() mappings for CollectEntitlementsAndGrantsTasksOp, but TestSyncerTokenMarshalUnmarshal lacks coverage for this new operation. Extend the test to include this enum member to verify MarshalJSON/UnmarshalJSON work correctly.

Separately, ListResourcesForEntitlementsOp is confirmed to intentionally lack String()/newActionOp mappings (pre-existing behavior); if intentional, this is acceptable but may warrant a comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (7)
pkg/dotc1z/c1file.go (3)

271-348: Stats() method still lacks checkpoint read locks.

This is the same critical issue raised in the past review comment. The DB read operations at lines 319-322, 330-332, and 338-340 (CountContext calls) are not wrapped with acquireCheckpointLock() / releaseCheckpointLock(), making the checkpoint coordination mechanism ineffective. When performWALCheckpoint() acquires the write lock, these hot-path reads can still execute, potentially causing checkpoint starvation.

Add checkpoint read locks around each CountContext call, following the pattern used in sql_helpers.go.

Example for the first DB call:

+	c.acquireCheckpointLock()
 	resourceCount, err := c.db.From(resources.Name()).
 		Where(goqu.C("resource_type_id").Eq(rt.Id)).
 		Where(goqu.C("sync_id").Eq(syncId)).
 		CountContext(ctx)
+	c.releaseCheckpointLock()
 	if err != nil {
 		return nil, err
 	}

Apply similar wrapping for the CountContext calls at lines 330-332 and 338-340.


399-456: GrantStats() method still lacks checkpoint read locks.

This is the same critical issue raised in the past review comment. The DB read operation at lines 444-447 (CountContext call) is not wrapped with acquireCheckpointLock() / releaseCheckpointLock(), making the checkpoint coordination mechanism ineffective.

Add checkpoint read locks around the CountContext call:

+		c.acquireCheckpointLock()
 		grantsCount, err := c.db.From(grants.Name()).
 			Where(goqu.C("sync_id").Eq(syncId)).
 			Where(goqu.C("resource_type_id").Eq(resourceType.Id)).
 			CountContext(ctx)
+		c.releaseCheckpointLock()
 		if err != nil {
 			return nil, err
 		}

458-466: isWALMode() still uses case-sensitive comparison without DB query fallback.

This is the same issue raised in the past review comment. The current implementation could miss cases where journal_mode is set with different casing or configured outside the pragmas slice.

Consider implementing the suggested case-insensitive comparison with DB query fallback for robustness.

pkg/sync/parallel_syncer.go (4)

75-89: Panic on task enqueue failure can crash the sync.

Line 86 panics when task re-enqueuing fails, which will terminate the entire sync process. This is a critical failure mode already noted in past review.


379-463: Unsafe dynamic channel replacement can lose tasks and deadlock producers.

expandQueueAndRetry replaces a bucket's channel while producers may still hold references to the old channel. Producers blocked on the old channel will never complete, and the old channel is never drained by workers after replacement. This critical concurrency bug was already flagged in past review.


579-590: Closing bucket channels risks panic if producers still send.

Close() closes all per-bucket channels while producers may still attempt to send. This will cause a panic. This critical issue was already noted in past review—channels should not be closed; instead, mark queue as closed and signal workers via context.


822-835: Unconditionally overrides WAL checkpoint setting.

Line 829 unconditionally sets baseSyncer.enableWALCheckpoint = true, overriding any caller configuration. This issue was already noted in past review comments.

🧹 Nitpick comments (3)
pkg/dotc1z/sql_helpers.go (2)

314-316: Consider checking SQLite error codes instead of string matching.

String matching error messages is fragile and may produce false positives. If using mattn/go-sqlite3 or a similar driver, prefer checking the error code directly.

Example refactor:

 func isSQLiteBusy(err error) bool {
-	return strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY")
+	var sqliteErr sqlite3.Error
+	if errors.As(err, &sqliteErr) {
+		return sqliteErr.Code == sqlite3.ErrBusy || sqliteErr.ExtendedCode == sqlite3.ErrBusySnapshot
+	}
+	return false
 }

349-437: LGTM: Retry logic and error handling are well-implemented.

The function correctly handles rollback errors (using errors.Join), properly manages the checkpoint lock lifecycle across all code paths, and addresses the previous review comment regarding errcheck warnings. The retry logic with context cancellation is sound.

Consider adding debug logging when retries occur to improve observability during production issues:

 		if isSQLiteBusy(err) && attempt < maxRetries-1 {
+			ctxzap.Extract(ctx).Debug("retrying chunk insert due to SQLITE_BUSY",
+				zap.Int("attempt", attempt+1),
+				zap.Int("maxRetries", maxRetries),
+			)
 			select {
pkg/sync/parallel_syncer.go (1)

2144-2152: Type assertion in NewParallelSyncerFromSyncer could be more helpful.

Lines 2147-2148 return a generic error when the type assertion fails. Consider providing more context about what types are supported and why the assertion failed.

 func NewParallelSyncerFromSyncer(s Syncer, config *ParallelSyncConfig) (*parallelSyncer, error) {
 	// Try to cast to the concrete syncer type
 	if baseSyncer, ok := s.(*SequentialSyncer); ok {
 		return NewParallelSyncer(baseSyncer, config), nil
 	}
 
-	return nil, fmt.Errorf("cannot create parallel syncer from syncer type: %T", s)
+	return nil, fmt.Errorf("cannot create parallel syncer from syncer type %T; only *SequentialSyncer is supported", s)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 045b6f9 and e68e083.

📒 Files selected for processing (3)
  • pkg/dotc1z/c1file.go (10 hunks)
  • pkg/dotc1z/sql_helpers.go (4 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
  • C1File (38-62)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • SequentialSyncer (314-341)
pkg/dotc1z/manager/manager.go (1)
  • New (50-83)
pkg/sync/state.go (1)
  • Action (130-137)
pb/c1/connector/v2/resource.pb.go (4)
  • ResourceType (126-141)
  • ResourceType (154-154)
  • Resource (2737-2749)
  • Resource (2762-2762)
pkg/annotations/annotations.go (1)
  • Annotations (12-12)
🪛 GitHub Check: go-lint
pkg/dotc1z/c1file.go

[failure] 499-499:
Comment should end in a period (godot)

pkg/sync/parallel_syncer.go

[failure] 1241-1241:
func (*parallelSyncer).syncResources is unused (unused)


[failure] 1077-1077:
SA4010: this result of append is never used, except maybe in other appends (staticcheck)


[failure] 37-37:
function min has same name as predeclared identifier (predeclared)


[failure] 1199-1199:
Comment should end in a period (godot)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (16)
pkg/dotc1z/c1file.go (10)

14-15: LGTM!

Correct imports added for structured logging (ctxzap and zap), addressing the past review comment about replacing fmt.Printf.


55-61: LGTM!

WAL checkpoint fields are well-structured with proper synchronization primitives (RWMutex, channels, sync.Once) and clear documentation.


80-84: LGTM!

Standard functional option pattern correctly implemented.


106-107: LGTM!

Checkpoint channels properly initialized. Unbuffered channels are appropriate for stop signaling and done notification.


127-132: LGTM!

C1Z-level WAL checkpoint option correctly implemented, following the established functional option pattern.

Also applies to: 153-157


178-180: LGTM!

Correctly propagates WAL checkpoint flag from C1Z-level to C1F-level option.


205-212: LGTM!

Checkpoint cleanup logic is correct and idempotent. The nil check, sync.Once protection for channel closure, and wait on checkpointDone ensure graceful shutdown even if Close() is called multiple times.


263-266: LGTM!

Checkpoint initialization correctly gated by both the enabled flag and WAL mode check.


468-483: LGTM!

Background checkpoint goroutine follows proper lifecycle patterns with correct cleanup (defer close on checkpointDone) and graceful shutdown via checkpointStop channel. 5-minute interval is reasonable for WAL checkpointing.


485-497: LGTM!

Lock helper methods provide clean conditional locking based on the checkpointEnabled flag. Implementation is correct and straightforward.

pkg/dotc1z/sql_helpers.go (2)

229-231: LGTM: Checkpoint lock properly coordinates WAL operations.

The checkpoint lock acquisition correctly prevents WAL checkpoints during query execution and result iteration, avoiding potential consistency issues and WAL file growth under concurrent load.


339-346: LGTM: Clean separation of chunking and retry logic.

The refactoring to delegate per-chunk execution to executeChunkWithRetry improves code organization and testability.

pkg/sync/parallel_syncer.go (4)

170-237: Clean state abstraction for parallel execution.

The StateInterface, LocalStateContext, and ActionDecision types provide a well-designed abstraction that allows parallel workers to maintain independent state while reusing core sync logic. No concurrency concerns since each worker maintains its own LocalStateContext.


617-732: Worker loop and task dispatching logic is sound.

The worker's main loop (lines 617-732) properly handles context cancellation, implements bucket affinity with work-stealing, tracks consecutive failures, and manages rate limiting with bucket-aware backoff. The atomic flags for isProcessing and rateLimited are correctly used.


838-889: Parallel sync orchestration is well-structured.

The Sync() method properly orchestrates parallel execution: initializes sync, creates task queue, starts workers, generates initial tasks, waits for completion, then sequentially handles grant expansion and external resources. The lifecycle management and cleanup are correct.


1855-2006: Grant sync logic correctly handles pagination, etags, and state.

syncGrantsForResourceLogic properly fetches previous sync data, handles etags, processes grants with expansion/external resource detection, stores results, and returns pagination decisions. The state abstraction allows both parallel and sequential callers.

Comment on lines +116 to +169
// processTaskImmediately processes a task directly without going through the queue.
func (w *worker) processTaskImmediately(task *task) error {
l := ctxzap.Extract(w.ctx)
l.Debug("processing task immediately",
zap.String("operation", task.Action.Op.String()),
zap.String("resource_type", task.Action.ResourceTypeID),
zap.String("resource_id", task.Action.ResourceID))

switch task.Action.Op {
case CollectEntitlementsAndGrantsTasksOp:
// For resource tasks, we need to collect any sub-tasks and process them immediately too
tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action)
if err != nil {
return err
}
// Process any collected tasks immediately
for _, subTask := range tasks {
if err := w.processTaskImmediately(subTask); err != nil {
l.Error("failed to process sub-task immediately", zap.Error(err))
return err
}
}
return nil
case SyncEntitlementsOp:
if task.Action.ResourceID != "" {
return w.syncer.syncEntitlementsForResource(w.ctx, task.Action)
} else {
return w.syncer.syncEntitlementsForResourceType(w.ctx, task.Action)
}
case SyncGrantsOp:
if task.Action.ResourceID != "" {
return w.syncer.syncGrantsForResource(w.ctx, task.Action)
} else {
return w.syncer.syncGrantsForResourceType(w.ctx, task.Action)
}
case SyncResourcesOp:
// For resource tasks, we need to collect any sub-tasks and process them immediately too
tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action)
if err != nil {
return err
}
// Process any collected tasks immediately
for _, subTask := range tasks {
if err := w.processTaskImmediately(subTask); err != nil {
l.Error("failed to process sub-task immediately", zap.Error(err))
return err
}
}
return nil
default:
return fmt.Errorf("unsupported operation for immediate processing: %s", task.Action.Op.String())
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recursive immediate processing risks stack overflow.

processTaskImmediately recursively calls itself for sub-tasks (lines 133, 159). Deep task chains (e.g., deeply nested child resources) could exhaust the stack. Consider iterative processing with a local queue or enforce a depth limit.

 func (w *worker) processTaskImmediately(task *task) error {
 	l := ctxzap.Extract(w.ctx)
+	const maxDepth = 100
+	
+	// Use a local stack to avoid stack overflow
+	taskStack := []*task{task}
+	
+	for len(taskStack) > 0 {
+		if len(taskStack) > maxDepth {
+			return fmt.Errorf("task depth exceeded %d, possible infinite recursion", maxDepth)
+		}
+		
+		currentTask := taskStack[len(taskStack)-1]
+		taskStack = taskStack[:len(taskStack)-1]
+		
 	l.Debug("processing task immediately",
-		zap.String("operation", task.Action.Op.String()),
-		zap.String("resource_type", task.Action.ResourceTypeID),
-		zap.String("resource_id", task.Action.ResourceID))
+		zap.String("operation", currentTask.Action.Op.String()),
+		zap.String("resource_type", currentTask.Action.ResourceTypeID),
+		zap.String("resource_id", currentTask.Action.ResourceID))
 
-	switch task.Action.Op {
+	switch currentTask.Action.Op {
 	case CollectEntitlementsAndGrantsTasksOp:
-		// For resource tasks, we need to collect any sub-tasks and process them immediately too
-		tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action)
+		tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, currentTask.Action)
 		if err != nil {
 			return err
 		}
-		// Process any collected tasks immediately
-		for _, subTask := range tasks {
-			if err := w.processTaskImmediately(subTask); err != nil {
-				l.Error("failed to process sub-task immediately", zap.Error(err))
-				return err
-			}
-		}
-		return nil
+		// Add sub-tasks to stack for processing
+		taskStack = append(taskStack, tasks...)
 	// ... handle other cases similarly
+	}
+	}
+	return nil
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +482 to +537
func (q *taskQueue) GetTask(ctx context.Context) (*task, error) {
q.mu.Lock() // Use write lock to make the operation atomic
defer q.mu.Unlock()

// Debug logging
l := ctxzap.Extract(ctx)
l.Debug("GetTask called",
zap.Int("total_buckets", len(q.bucketQueues)),
zap.Strings("bucket_names", getMapKeys(q.bucketQueues)))

if len(q.bucketQueues) == 0 {
l.Debug("no buckets available")
return nil, errors.New("no buckets available")
}

// First, try to find a bucket with available tasks
var availableBuckets []string
for bucketName, queue := range q.bucketQueues {
queueLen := len(queue)
l.Debug("checking bucket", zap.String("bucket", bucketName), zap.Int("queue_length", queueLen))
if queueLen > 0 {
availableBuckets = append(availableBuckets, bucketName)
}
}

l.Debug("available buckets", zap.Strings("buckets", availableBuckets))

if len(availableBuckets) == 0 {
l.Debug("no tasks available in any bucket")
return nil, errors.New("no tasks available")
}

// Try to get a task from each available bucket in round-robin order
// Use a more robust approach that handles the case where a queue becomes empty
for _, bucketName := range availableBuckets {
queue := q.bucketQueues[bucketName]

// Double-check the queue still has items before trying to read
if len(queue) == 0 {
l.Debug("bucket queue became empty", zap.String("bucket", bucketName))
continue
}

select {
case t := <-queue:
l.Debug("retrieved task from bucket", zap.String("bucket", bucketName))
return t, nil
default:
l.Debug("bucket queue empty when trying to read", zap.String("bucket", bucketName))
continue
}
}

l.Debug("failed to get task from any available bucket")
return nil, errors.New("no tasks available")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

GetTask holds write lock for entire operation, causing contention.

Lines 483-484 acquire a write lock that's held while iterating buckets and reading from channels. This blocks all concurrent AddTask operations. Consider using a read lock for bucket discovery, then acquiring write lock only when actually removing a task from a channel, or use a lock-free approach.

 func (q *taskQueue) GetTask(ctx context.Context) (*task, error) {
-	q.mu.Lock() // Use write lock to make the operation atomic
-	defer q.mu.Unlock()
+	q.mu.RLock()
 
 	// Debug logging
 	l := ctxzap.Extract(ctx)
@@ -494,6 +494,7 @@
 		return nil, errors.New("no buckets available")
 	}
 
+	q.mu.RUnlock()
 	// First, try to find a bucket with available tasks
 	var availableBuckets []string
+	q.mu.RLock()
 	for bucketName, queue := range q.bucketQueues {
@@ -506,6 +507,7 @@
 	}
 
 	l.Debug("available buckets", zap.Strings("buckets", availableBuckets))
+	q.mu.RUnlock()
 
 	if len(availableBuckets) == 0 {

Note: This is a simplified sketch. A production fix should carefully handle the race between checking len(queue) and reading from it.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 482-537, the GetTask function
currently holds the write lock for the whole discovery+read operation which
blocks AddTask; change it to use a read lock (RLock) to discover non-empty
buckets and capture references to their channels and names, then release the
read lock before attempting non-blocking reads from those channels; only acquire
the write lock briefly if you need to modify q.bucketQueues or remove an empty
bucket (do not hold it while performing channel receives). Also remove
len(queue) checks that rely on the lock by using non-blocking select receives to
avoid races, retain debug logging, and ensure any mutation of the map is done
under the write lock only for the minimal window.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (14)
pkg/dotc1z/c1file.go (3)

271-348: Stats method still lacks checkpoint read locks (duplicate concern).

The past review comment flagged that Stats performs database reads without acquiring checkpoint read locks, making the checkpoint coordination ineffective. The current code still shows no calls to acquireCheckpointLock() / releaseCheckpointLock() around the three CountContext() operations (lines 319-322, 330-332, 338-340).

Either the fix was reverted, or this code predates the addressed commit. Verify that checkpoint locks are properly acquired before these DB operations.


399-456: GrantStats method still lacks checkpoint read locks (duplicate concern).

Similar to Stats, this method performs database reads (lines 444-447) without acquiring checkpoint read locks. The past review comment flagged this issue, but the current code does not show the fix. Verify that acquireCheckpointLock() / releaseCheckpointLock() are properly used.


499-499: Fix comment punctuation to satisfy linter (duplicate concern).

The function comment on line 499 does not end with a period, which violates the godot linter rule. A past review comment flagged this same issue.

Apply this diff:

-// performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE
+// performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE.
pkg/sync/parallel_syncer.go (11)

85-87: Panic can crash the entire sync.

Panicking here (line 86) will terminate the worker and potentially the entire sync process. Consider logging the error with a critical severity and either: (1) retrying with exponential backoff, (2) storing failed tasks for manual intervention, or (3) returning an error to gracefully shut down the worker.


116-168: Recursive processing risks stack overflow on deep task chains.

Lines 132–136 and 158–162 recursively call processTaskImmediately for sub-tasks. Deeply nested child resources or long task chains could exhaust the call stack. Consider iterative processing with a local task queue or enforce a depth limit.


270-276: Priority field is unused.

The Priority field (line 274) is set throughout the code but GetTask() (lines 481–537) uses round-robin bucket selection and FIFO ordering, completely ignoring priority. Either remove this field or implement priority-based scheduling.


379-463: Unsafe dynamic channel resize can deadlock or lose tasks.

Lines 400–463 attempt to "expand" a bucket channel by creating a new one and copying tasks. Producers that obtained the old channel reference (line 382) before the expansion can be left blocked forever (no reader drains the old channel) or may panic. Avoid dynamic resizing of buffered channels. Instead, return errTaskQueueFull and rely on the caller's retry/backoff logic.


481-537: GetTask holds write lock for entire operation, blocking AddTask.

Lines 483–484 acquire a write lock held for the entire bucket discovery and channel-read operation, blocking all concurrent AddTask calls. Use a read lock (RLock) to discover non-empty buckets, release it, then use non-blocking select receives. Only acquire the write lock briefly if you need to modify the bucket map.


578-590: Closing bucket channels risks panic if producers still send.

Lines 584–587 close per-bucket channels. If any producer still holds a channel reference obtained earlier and sends to it, the program will panic. Instead, set q.closed = true under the lock and signal workers via context cancellation or a shared done channel. Do not close individual bucket channels.


778-789: Incorrect rate limit detection using sql.ErrConnDone.

Line 788 treats sql.ErrConnDone as a rate limit indicator, but this error means the database connection is closed, not that an API is rate-limited. Remove this check and rely solely on codes.ResourceExhausted for gRPC rate limiting.

 func (w *worker) isRateLimitError(err error) bool {
-	// Check for rate limit annotations in the error
 	if err == nil {
 		return false
 	}
 
-	// This is a simplified check - in practice, you'd want to check the actual
-	// error type returned by the connector for rate limiting
-	return status.Code(err) == codes.ResourceExhausted ||
-		errors.Is(err, sql.ErrConnDone) // Placeholder for rate limit detection
+	return status.Code(err) == codes.ResourceExhausted
 }

827-829: Unconditionally overrides WAL checkpoint setting.

Line 828 unconditionally sets baseSyncer.enableWALCheckpoint = true, overriding any caller intent. Consider honoring the existing setting or making this explicit in ParallelSyncConfig.

-	// Enable WAL checkpointing for parallel sync to prevent checkpoint failures under high concurrency
-	baseSyncer.enableWALCheckpoint = true
+	// Enable WAL checkpointing if not already set by caller
+	if !baseSyncer.enableWALCheckpoint {
+		baseSyncer.enableWALCheckpoint = true
+	}

1071-1080: Unused activeBuckets slice.

Lines 1073–1078 build an activeBuckets slice but never use it (SA4010). Either log the active buckets with l.Debug("active buckets", zap.Strings("active_buckets", activeBuckets)) or remove this dead code.


1199-1199: Missing period in comment.

Line 1199 comment should end with a period per godot linter: // finalizeSync performs final sync cleanup.

-// finalizeSync performs final sync cleanup
+// finalizeSync performs final sync cleanup.
 func (ps *parallelSyncer) finalizeSync(ctx context.Context) error {

71-71: Unreachable return statement.

Line 71 is unreachable because the loop at lines 46–69 always returns within its body (lines 49, 58, or inside the select at lines 64–67). Remove this line or replace it with a panic/error to indicate an unexpected state.

-	return nil // This should never be reached
+	// Unreachable: loop above always returns
🧹 Nitpick comments (4)
pkg/dotc1z/c1file.go (1)

458-466: Make WAL mode detection more robust with case-insensitive comparison and fallback query.

The current implementation uses exact string matching for pragma name and value, which will fail if the journal mode is specified as "wal" (lowercase) or set through other means (connection string, environment variable, prior session). A past review suggested adding case-insensitive comparison and a fallback query to PRAGMA journal_mode.

Consider implementing the suggested approach for more robust detection.

pkg/sync/parallel_syncer.go (1)

1248-1255: Panic recovery logs but doesn't return error.

Lines 1248–1255 use defer with recover() to catch panics, but only log the panic without returning an error to the caller. This could mask serious issues. Either return an error from the deferred function (requires named return) or remove the recovery to let panics propagate naturally for debugging.

pkg/sync/syncer.go (2)

72-72: Remove outdated TODO comment.

Line 72's TODO comment states "use a mutex or a syncmap for when this code becomes parallel" but the mutex has now been implemented (lines 66-67). This comment should be removed or updated.

-// TODO: use a mutex or a syncmap for when this code becomes parallel
 func NewProgressCounts() *ProgressCounts {

90-177: Consider helper methods to reduce conditional locking duplication.

Lines 92–98, 108–118, 128–134, 148–154, 170–176, etc. all duplicate the conditional locking pattern. While functionally correct, consider extracting helpers like getResourceCount(resourceType string) int and updateLastLogTime(map, resourceType, time) to reduce duplication and improve maintainability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e68e083 and 3930828.

📒 Files selected for processing (3)
  • pkg/dotc1z/c1file.go (10 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
  • pkg/sync/syncer.go (53 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/sync/parallel_syncer.go (4)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • SequentialSyncer (314-341)
pkg/dotc1z/manager/manager.go (1)
  • New (50-83)
pkg/sync/state.go (1)
  • Action (130-137)
pb/c1/connector/v2/resource.pb.go (10)
  • ResourceId (2652-2659)
  • ResourceId (2672-2672)
  • ResourceType (126-141)
  • ResourceType (154-154)
  • ResourceTypesServiceListResourceTypesRequest (275-284)
  • ResourceTypesServiceListResourceTypesRequest (297-297)
  • ResourcesServiceListResourcesRequest (2925-2935)
  • ResourcesServiceListResourcesRequest (2948-2948)
  • Resource (2737-2749)
  • Resource (2762-2762)
pkg/sync/syncer.go (3)
pkg/sync/expand/graph.go (1)
  • EntitlementGraphAction (14-20)
pkg/sync/state.go (1)
  • Action (130-137)
pkg/dotc1z/manager/manager.go (4)
  • ManagerOption (26-26)
  • WithTmpDir (28-32)
  • WithWALCheckpoint (40-44)
  • New (50-83)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (7)
pkg/dotc1z/c1file.go (1)

500-517: Checkpoint implementation is solid.

The WAL checkpoint logic correctly:

  • Acquires write lock to pause all database operations
  • Uses appropriate timeout (30s)
  • Implements sensible fallback strategy (TRUNCATE → RESTART)
  • Logs errors without failing the operation
  • Properly releases the lock via defer
pkg/sync/syncer.go (6)

278-311: Thread-safe mutation methods are well-designed.

The new mutation methods (lines 280–311) always acquire the mutex for safety, while the read/log methods conditionally lock based on sequentialMode. This is a sound design: mutations are always safe, and reads optimize for the sequential case.


313-341: SequentialSyncer export and WAL checkpoint field enable parallel wrapping.

Renaming to SequentialSyncer (line 314) and adding enableWALCheckpoint (line 340) are appropriate changes that enable the parallel syncer to wrap and configure the sequential implementation. The public API surface is well-defined.


1263-1269: Conditional progress update is correct.

Lines 1263–1269 correctly use conditional locking based on sequentialMode when updating EntitlementsProgress, maintaining consistency with the established pattern.


2817-2849: WAL checkpoint propagation to manager is correct.

Lines 2826–2830 correctly check enableWALCheckpoint and conditionally append the WithWALCheckpoint option to the manager, enabling the parallel syncer to control this behavior.


2886-3000: SyncOpt API update is consistent and correct.

All option functions (lines 2886–3000) have been correctly updated to target *SequentialSyncer instead of the internal *syncer type, maintaining API consistency after the rename.


3002-3029: NewSyncer correctly returns exported SequentialSyncer.

Lines 3003–3004 correctly update the constructor to return *SequentialSyncer instead of the internal type, completing the public API transition.

@jirwin jirwin force-pushed the jallers/jirwin/psync-deadlock-fix branch from 2446b88 to 1bccadf Compare October 22, 2025 00:21
@jirwin jirwin requested review from ggreer, kans and laurenleach October 22, 2025 00:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/dotc1z/c1file.go (1)

272-349: Add checkpoint read locks to Stats() and GrantStats() database operations.

Verification confirms the checkpoint locks are missing. The pattern is established in sql_helpers.go (lines 230-231 and 361-365), wrapping database operations with c.acquireCheckpointLock() and defer c.releaseCheckpointLock(). Both Stats() and GrantStats() execute hot-path CountContext queries without these locks:

  • Stats() (lines 320-323, 331-333, 339-341): Three CountContext calls need lock protection
  • GrantStats() (lines 445-448): One CountContext call needs lock protection

Add the checkpoint lock pattern around each database count operation to coordinate with WAL checkpointing, matching the established pattern in sql_helpers.go.

♻️ Duplicate comments (6)
pkg/dotc1z/sql_helpers.go (1)

348-437: Unreachable code after retry loop; past review for rollback handling properly addressed.

The rollback error handling from the past review has been correctly implemented using errors.Join (lines 382-385, 393-397, 403-406). However, lines 436-437 are unreachable: every iteration of the retry loop either returns (on success, non-retryable error, or final retry exhaustion) or continues (on retryable SQLITE_BUSY with remaining retries). The loop cannot complete naturally.

Remove the unreachable error return:

 		c.releaseCheckpointLock()
 		return nil
 	}
-
-	return fmt.Errorf("failed to execute chunk after %d retries", maxRetries)
 }
pkg/sync/state.go (1)

62-64: Enum/string mappings LGTM; add JSON round‑trip tests for the new op.

Mappings for CollectEntitlementsAndGrantsTasksOp are correct. Please extend tests to cover marshal/unmarshal round‑trip for this op. If ListResourcesForEntitlementsOp intentionally lacks a mapping (pre‑existing), add a brief comment to document intent.

Also applies to: 107-109, 126-127

pkg/sync/parallel_syncer.go (4)

71-71: Unreachable return in addTaskWithRetry.

The final return nil is unreachable. Return an explicit error for unexpected fallthrough.

-    }
-  }
-
-  return nil // This should never be reached
+    }
+  }
+  // Unreachable: defensive error to aid diagnostics
+  return fmt.Errorf("unexpected: addTaskWithRetry exited loop without returning")

379-397: Do not “resize” bucket channels; risks deadlock/lost tasks.

Replacing a chan while producers still hold the old reference can wedge or lose tasks. Return queue‑full and rely on caller backoff, or redesign with a mutex+cond queue.

 func (q *taskQueue) AddTaskWithTimeout(ctx context.Context, t *task, timeout time.Duration) error {
@@
-  case <-time.After(timeout):
-    // Queue is full, try to expand it
-    return q.expandQueueAndRetry(bucket, t, timeout)
+  case <-time.After(timeout):
+    return errTaskQueueFull
@@
 }
 
-// expandQueueAndRetry attempts to expand the queue and retry adding the task.
-func (q *taskQueue) expandQueueAndRetry(bucket string, t *task, timeout time.Duration) error {
-  q.mu.Lock()
-  defer q.mu.Unlock()
-  ...
-  return errTaskQueueFull
-}
+// Removed: dynamic queue expansion (unsafe for concurrent producers).

Also applies to: 399-463


483-537: GetTask holds write lock while probing/receiving; high contention and potential stalls.

Use RLock to snapshot buckets, release lock before non‑blocking receives; take write lock only when mutating the map. Avoid reading from chans under the write lock.


117-169: Recursive immediate processing risks deep recursion/stack overflow.

processTaskImmediately recursively processes sub‑tasks; deep trees can overflow. Switch to an explicit stack/queue with a max‑depth guard.

-func (w *worker) processTaskImmediately(task *task) error {
-  ...
-  switch task.Action.Op {
+func (w *worker) processTaskImmediately(root *task) error {
+  const maxDepth = 100
+  stack := []*task{root}
+  for len(stack) > 0 {
+    if len(stack) > maxDepth {
+      return fmt.Errorf("immediate processing depth exceeded %d", maxDepth)
+    }
+    task := stack[len(stack)-1]
+    stack = stack[:len(stack)-1]
+    switch task.Action.Op {
       case CollectEntitlementsAndGrantsTasksOp:
-        tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action)
+        tasks, err := w.syncer.collectEntitlementsAndGrantsTasks(w.ctx, task.Action)
         if err != nil { return err }
-        for _, subTask := range tasks { if err := w.processTaskImmediately(subTask); err != nil { ... } }
-        return nil
+        stack = append(stack, tasks...)
+        continue
       case SyncResourcesOp:
-        tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action)
+        tasks, err := w.syncer.syncResourcesCollectTasks(w.ctx, task.Action)
         if err != nil { return err }
-        for _, subTask := range tasks { if err := w.processTaskImmediately(subTask); err != nil { ... } }
-        return nil
+        stack = append(stack, tasks...)
+        continue
       ...
-  }
+    }
+  }
+  return nil
}
🧹 Nitpick comments (4)
pkg/tasks/c1api/manager.go (1)

57-58: Plumbing looks correct; consider Options to avoid constructor bloat.

  • Field/threading LGTM; parallelSync passed only to FullSync handler as intended.
  • The ctor now has 10+ positional params. Prefer functional options for maintainability and binary‑compat upgrades.

Also applies to: 260-261, 310-320, 334-335

pkg/sync/parallel_syncer.go (1)

548-564: Avoid receiving from bucket under write lock in GetTaskFromBucket.

Same locking concern as GetTask. Snapshot the channel under RLock, unlock, then perform the non‑blocking receive.

pkg/tasks/local/syncer.go (1)

68-72: Local parallel wrapper wiring LGTM; avoid hard‑coding worker count.

The builder use is correct now. Consider exposing worker count as an option rather than a constant 10 to let callers tune concurrency.

Also applies to: 101-121, 116-121

pkg/connectorrunner/runner.go (1)

559-564: Option to enable parallel sync: LGTM.

Simple flag is fine. You may also consider a WithParallelWorkers(n) option to surface worker count tuning from CLI.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2446b88 and 1bccadf.

⛔ Files ignored due to path filters (2)
  • pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go
  • pb/c1/connector/v2/resource_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (16)
  • pb/c1/connector/v2/resource.pb.validate.go (1 hunks)
  • pkg/cli/commands.go (1 hunks)
  • pkg/connectorrunner/runner.go (5 hunks)
  • pkg/dotc1z/c1file.go (10 hunks)
  • pkg/dotc1z/manager/local/local.go (3 hunks)
  • pkg/dotc1z/manager/manager.go (4 hunks)
  • pkg/dotc1z/manager/s3/s3.go (3 hunks)
  • pkg/dotc1z/sql_helpers.go (4 hunks)
  • pkg/field/defaults.go (2 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
  • pkg/sync/state.go (3 hunks)
  • pkg/sync/syncer.go (53 hunks)
  • pkg/tasks/c1api/full_sync.go (4 hunks)
  • pkg/tasks/c1api/manager.go (4 hunks)
  • pkg/tasks/local/syncer.go (4 hunks)
  • proto/c1/connector/v2/resource.proto (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
  • pkg/tasks/c1api/full_sync.go
  • pkg/cli/commands.go
  • pb/c1/connector/v2/resource.pb.validate.go
  • pkg/dotc1z/manager/s3/s3.go
  • pkg/dotc1z/manager/local/local.go
  • pkg/field/defaults.go
  • proto/c1/connector/v2/resource.proto
  • pkg/dotc1z/manager/manager.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-21T23:58:13.698Z
Learnt from: jirwin
PR: ConductorOne/baton-sdk#524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.698Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.

Applied to files:

  • pkg/sync/parallel_syncer.go
  • pkg/sync/syncer.go
🧬 Code graph analysis (6)
pkg/tasks/local/syncer.go (3)
pkg/connectorrunner/runner.go (2)
  • WithParallelSyncEnabled (559-564)
  • Option (254-254)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • NewSyncer (3063-3089)
pkg/sync/parallel_syncer.go (2)
  • DefaultParallelSyncConfig (250-255)
  • NewParallelSyncer (833-845)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
  • WithParallelSyncEnabled (68-72)
  • Option (30-30)
pkg/tasks/c1api/manager.go (1)
  • NewC1TaskManager (309-336)
pkg/dotc1z/sql_helpers.go (1)
pkg/dotc1z/c1file.go (1)
  • C1File (39-63)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/sync/parallel_syncer.go (5)
pkg/sync/syncer.go (2)
  • Syncer (53-56)
  • SequentialSyncer (314-342)
pkg/dotc1z/manager/manager.go (1)
  • New (50-83)
pkg/sync/state.go (7)
  • Action (130-137)
  • CollectEntitlementsAndGrantsTasksOp (126-126)
  • SyncEntitlementsOp (119-119)
  • SyncGrantsOp (121-121)
  • SyncResourcesOp (118-118)
  • SyncGrantExpansionOp (124-124)
  • SyncExternalResourcesOp (122-122)
pb/c1/connector/v2/resource.pb.go (4)
  • ResourceType (126-141)
  • ResourceType (154-154)
  • Resource (2737-2749)
  • Resource (2762-2762)
pkg/annotations/annotations.go (1)
  • Annotations (12-12)
pkg/sync/syncer.go (1)
pkg/dotc1z/manager/manager.go (4)
  • ManagerOption (26-26)
  • WithTmpDir (28-32)
  • WithWALCheckpoint (40-44)
  • New (50-83)
🪛 GitHub Check: go-lint
pkg/dotc1z/c1file.go

[failure] 468-468:
(*database/sql.DB).QueryRow must not be called. use (*database/sql.DB).QueryRowContext (noctx)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (14)
pkg/dotc1z/sql_helpers.go (2)

314-316: LGTM: Simple but effective SQLite busy detection.

The string-matching approach is appropriate for SQLite error detection in Go. While slightly brittle, it covers the common error messages for database lock contention.


229-231: LGTM: Checkpoint lock properly coordinates read operations.

The checkpoint lock is correctly acquired before query execution and released via defer, ensuring the entire query and row iteration are protected from concurrent WAL checkpoints.

pkg/dotc1z/c1file.go (8)

81-85: LGTM: Clean option function for WAL checkpoint control.


154-158: LGTM: Option correctly propagates through C1Z wrapper.


179-181: LGTM: Proper propagation of WAL checkpoint option to C1File.


206-213: LGTM: Checkpoint lifecycle properly coordinated with sync.Once and channel signaling.

The shutdown sequence correctly stops the ticker, closes the stop channel exactly once, and waits for goroutine termination.


264-267: LGTM: Checkpoint initialization properly gated.

Starting checkpointing only when both enabled and in WAL mode is the correct guard.


475-490: LGTM: Background checkpoint loop is cleanly structured.

The goroutine properly closes checkpointDone on exit, and the select handles both periodic ticks and shutdown signaling.


492-504: LGTM: Read lock helpers correctly guard DB operations.

These helpers allow normal DB operations (readers) to proceed concurrently but block when performWALCheckpoint acquires the write lock.


506-524: LGTM: Checkpoint logic is sound with proper fallback strategy.

The write lock correctly pauses all DB activity during checkpointing. The TRUNCATE → RESTART fallback and structured error logging are appropriate.

pkg/sync/parallel_syncer.go (1)

838-841: WAL checkpointing always-on for parallel: acknowledged.

Unconditional enableWALCheckpoint = true matches the stated design requirement for parallel sync. Consider a clarifying comment that this is mandatory, not configurable.

pkg/sync/syncer.go (2)

66-68: Thread‑safe ProgressCounts: good separation; ensure mode set before concurrency.

Mutex‑guarded helpers and sequentialMode gate look solid. Confirm SetSequentialMode(false) is called before starting workers (it is in parallelSyncer.initializeSync). No further action.

Also applies to: 81-82, 280-312


2879-2886: WAL checkpoint plumbing LGTM.

loadStore honors enableWALCheckpoint via manager options. Matches the parallel wrapper behavior.

Also applies to: 2897-2903

pkg/connectorrunner/runner.go (1)

806-818: End‑to‑end flag propagation verified.

parallelSync is threaded to both local and C1 task managers. Looks consistent.

Also applies to: 793-795

Comment on lines +171 to +181
// StateInterface defines the minimal interface needed by helper methods.
// This allows helper methods to work with either the sequential syncer's state machine
// or the parallel syncer's local state context.
type StateInterface interface {
PageToken(ctx context.Context) string
NextPage(ctx context.Context, pageToken string) error
SetNeedsExpansion()
SetHasExternalResourcesGrants()
ShouldFetchRelatedResources() bool
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

LocalStateContext cannot ever set ShouldFetchRelatedResources; parity gap vs sequential.

StateInterface lacks SetShouldFetchRelatedResources(), and LocalStateContext’s shouldFetchRelated is never set. Parallel paths will never fetch related resources even when sequential would (e.g., targeted syncs).

Suggested fix: add a setter to StateInterface and LocalStateContext, and thread the flag from the initiating context (e.g., when targeted sync is enabled).

 type StateInterface interface {
   PageToken(ctx context.Context) string
   NextPage(ctx context.Context, pageToken string) error
   SetNeedsExpansion()
   SetHasExternalResourcesGrants()
   ShouldFetchRelatedResources() bool
+  SetShouldFetchRelatedResources()
 }
@@
 type LocalStateContext struct {
   resourceID           *v2.ResourceId
   pageToken            string
   needsExpansion       bool
   hasExternalResources bool
   shouldFetchRelated   bool
 }
@@
 func (lsc *LocalStateContext) ShouldFetchRelatedResources() bool {
   return lsc.shouldFetchRelated
 }
+
+func (lsc *LocalStateContext) SetShouldFetchRelatedResources() {
+  lsc.shouldFetchRelated = true
+}

Please confirm intended behavior for partial/targeted parallel syncs.

Also applies to: 193-201


I'll verify this review comment by examining the codebase to confirm whether the claimed parity gap actually exists.


---

🏁 Script executed:

```shell
#!/bin/bash
# Find StateInterface definition and examine its methods
rg -n "type StateInterface interface" -A 10 pkg/sync/parallel_syncer.go

Length of output: 420


🏁 Script executed:

#!/bin/bash
# Find LocalStateContext definition
rg -n "type LocalStateContext struct" -A 10 pkg/sync/parallel_syncer.go

Length of output: 401


🏁 Script executed:

#!/bin/bash
# Search for all references to shouldFetchRelated in the parallel syncer
rg -n "shouldFetchRelated" pkg/sync/parallel_syncer.go -B 2 -A 2

Length of output: 1025


🏁 Script executed:

#!/bin/bash
# Search for SetShouldFetchRelatedResources method calls or definitions
rg -n "SetShouldFetchRelatedResources" pkg/sync/

Length of output: 196


🏁 Script executed:

#!/bin/bash
# Find sequential syncer and check how it handles ShouldFetchRelatedResources
rg -n "ShouldFetchRelatedResources" pkg/sync/ -B 3 -A 3

Length of output: 4752


I need to verify whether the parallel syncer has targeted sync support and how LocalStateContext is instantiated:


---

🏁 Script executed:

```shell
#!/bin/bash
# Search for targeted sync references in parallel syncer
rg -n "targeted\|partial" pkg/sync/parallel_syncer.go -i

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Find where LocalStateContext is instantiated/created in parallel syncer
rg -n "NewLocalStateContext\|&LocalStateContext" pkg/sync/parallel_syncer.go -B 3 -A 3

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check the main parallel sync entry point for any state initialization
rg -n "func.*Sync.*Parallel\|func.*SyncResource" pkg/sync/parallel_syncer.go | head -20

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for where LocalStateContext might receive shouldFetchRelated value
rg -n "func.*LocalState" pkg/sync/parallel_syncer.go -A 5

Length of output: 1245


I need to determine if the parallel syncer is even invoked during targeted/partial syncs:


---

🏁 Script executed:

```shell
#!/bin/bash
# Search for where parallel syncer helper functions are called
rg -n "ProcessGrantsHelper\|processGrantsHelper" pkg/sync/ -B 5 -A 2

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check sequential syncer to see if it conditionally uses parallel syncer based on sync type
rg -n "parallel\|Parallel" pkg/sync/syncer.go -i -B 2 -A 2 | head -40

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for sync decision logic: when is parallel syncer used vs sequential
rg -n "func.*Sync\|shouldUseParallel\|useParallel" pkg/sync/syncer.go -i | head -20

Length of output: 0


🏁 Script executed:

#!/antml:bin/bash
# Check if there are any comments or docs about parallel syncer limitations
rg -n "LocalStateContext\|parallel.*target\|partial.*sync" pkg/sync/parallel_syncer.go -B 2 -A 2 -i

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Find all functions/methods that accept StateInterface parameter
rg -n "StateInterface" pkg/sync/parallel_syncer.go

Length of output: 509


🏁 Script executed:

#!/bin/bash
# List all exported functions in parallel_syncer (start with capital letter)
rg -n "^func \([A-Z]|^func \w+\(" pkg/sync/parallel_syncer.go | head -30

Length of output: 673


🏁 Script executed:

#!/bin/bash
# Look for helper method that uses StateInterface
ast-grep --pattern 'func $FNAME($$$state StateInterface$$$) {
  $$$
}'

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Search what functions call ShouldFetchRelatedResources in parallel_syncer
rg -n "ShouldFetchRelatedResources\|shouldFetchRelated" pkg/sync/parallel_syncer.go -B 5 -A 5

Length of output: 0


SetShouldFetchRelatedResources() missing in parallel syncer—LocalStateContext cannot set the flag.

The review comment is correct. I've verified:

  1. StateInterface (lines 174–180) lacks SetShouldFetchRelatedResources() method while the sequential syncer's State interface (in state.go) includes it.

  2. LocalStateContext (lines 195–210) has the shouldFetchRelated field but only a getter—no setter method exists. The field is always initialized to false.

  3. Parity gap confirmed: The sequential syncer calls SetShouldFetchRelatedResources() at line 599 (syncer.go), enabling related resource fetching in targeted sync flows. The parallel syncer helper methods syncGrantsForResourceLogic and syncEntitlementsForResourceLogic read this flag (line 1771) but have no mechanism to set it, so it remains permanently false.

  4. The flag is only read and propagated through the result; it is never written in the parallel syncer code path.

This prevents the parallel syncer from handling targeted/partial syncs that should fetch related resources. The suggested fix (adding SetShouldFetchRelatedResources() to both StateInterface and LocalStateContext) is appropriate and aligns with the existing pattern used for SetNeedsExpansion() and SetHasExternalResourcesGrants().

🤖 Prompt for AI Agents
In pkg/sync/parallel_syncer.go around lines 171–181 and LocalStateContext
definition around lines 195–210, the StateInterface is missing
SetShouldFetchRelatedResources() and LocalStateContext lacks the setter for
shouldFetchRelated; add SetShouldFetchRelatedResources() to the StateInterface
and implement a corresponding method on LocalStateContext that sets the
shouldFetchRelated boolean (mirror the pattern used by SetNeedsExpansion and
SetHasExternalResourcesGrants), ensure the method has the same signature as the
sequential syncer's State (no args, no return), and run go vet/build to fix any
missing interface implementations elsewhere.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
pkg/dotc1z/c1file.go (1)

274-348: Stats() and GrantStats() still lack checkpoint read locks despite past review.

As comprehensively identified in the previous review on outdated commits, both Stats() and GrantStats() perform database CountContext() operations without acquiring checkpoint read locks, defeating the WAL checkpoint coordination mechanism.

Required fixes:

  • Stats(): Wrap the three CountContext() calls (lines 320-323, 331-333, 339-341) with acquireCheckpointLock() / releaseCheckpointLock()
  • GrantStats(): Wrap the CountContext() call (lines 445-448) with acquireCheckpointLock() / releaseCheckpointLock()

This is marked as a duplicate because the issue was thoroughly documented in a previous review comment (marked as addressed in commit e68e083), but the checkpoint locks are not present in the current code.

Also applies to: 402-456

🧹 Nitpick comments (1)
pkg/dotc1z/c1file.go (1)

459-473: Consider case-insensitive pragma name comparison for consistency.

The pragma value comparison uses strings.EqualFold() (case-insensitive), but the pragma name check on line 462 is case-sensitive. While this works correctly (the fallback query catches any case variations), making the pragma name check case-insensitive would be more consistent and avoid unnecessary DB queries when users provide different casing.

 func (c *C1File) isWALMode(ctx context.Context) bool {
 	for _, pragma := range c.pragmas {
-		if pragma.name == "journal_mode" && strings.EqualFold(pragma.value, "wal") {
+		if strings.EqualFold(pragma.name, "journal_mode") && strings.EqualFold(pragma.value, "wal") {
 			return true
 		}
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1bccadf and bc22584.

📒 Files selected for processing (1)
  • pkg/dotc1z/c1file.go (10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/dotc1z/c1file.go (3)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
🔇 Additional comments (7)
pkg/dotc1z/c1file.go (7)

10-10: LGTM: Import additions support new WAL checkpointing functionality.

The strings import is used for case-insensitive comparison in isWALMode(), and the ctxzap/zap imports enable structured logging in performWALCheckpoint().

Also applies to: 15-16


56-62: LGTM: WAL checkpointing fields are well-structured.

The field declarations support the checkpoint lifecycle (ticker, channels, sync.Once) and coordination (RWMutex). The comment on line 61 clearly explains the purpose of the mutex.


81-85: LGTM: WAL checkpoint options follow established patterns.

The option functions correctly propagate the WAL checkpoint setting from C1ZFile through to C1File, maintaining consistency with the existing option pattern in the codebase.

Also applies to: 154-158, 179-181


107-108: LGTM: WAL checkpoint lifecycle management is sound.

The lifecycle is properly managed:

  • Channels initialized early in NewC1File()
  • Close() safely stops the ticker, uses sync.Once to prevent double-closing the stop channel, and waits for goroutine completion
  • init() defensively checks both the enabled flag and actual WAL mode before starting

Also applies to: 206-213, 264-267


475-490: LGTM: Background checkpoint goroutine is correctly implemented.

The goroutine properly:

  • Signals completion via checkpointDone channel
  • Responds to both ticker events and stop signals
  • Uses a reasonable 5-minute checkpoint interval

492-504: LGTM: Checkpoint lock helpers are correctly implemented.

The acquireCheckpointLock() and releaseCheckpointLock() helpers correctly acquire/release the RLock when checkpointing is enabled, allowing DB operations to coordinate with the checkpoint write lock.


506-524: LGTM: WAL checkpoint implementation is robust.

The checkpoint logic is sound:

  • Acquires exclusive write lock to ensure coordination with DB operations
  • Uses appropriate 30-second timeout
  • Implements proper fallback strategy (TRUNCATE → RESTART)
  • Uses structured logging for errors
  • Correctly doesn't fail the operation on checkpoint errors

Copy link
Contributor

@ggreer ggreer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly small potatoes, but I'm worried about the change in transaction behavior in executeChunkedInsert().

)
}
p.LastGrantLog[resourceType] = time.Now()
if !p.sequentialMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it significantly hurt performance to always lock/unlock this mutex? Heck, since it's just logging, can we avoid the mutex entirely? Worst case we'll log an extra line here or there, right?

Also a nitpick: The conditionals could be reversed and the condition could be if p.sequentialMode { ....


func WithC1FWALCheckpoint(enable bool) C1FOption {
return func(o *C1File) {
o.checkpointEnabled = enable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should turn on WAL mode. Or NewC1File should error if checkpointEnabled is true but WAL isn't. If checkpoint is enabled but WAL isn't, that's almost certainly a mistake, and probably the source of a future bug. ("Why isn't this checkpointing? I turned it on.")

c1fopts = append(c1fopts, WithC1FPragma(pragma.name, pragma.value))
}
if options.enableWALCheckpoint {
c1fopts = append(c1fopts, WithC1FWALCheckpoint(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the if statement and just have:

Suggested change
c1fopts = append(c1fopts, WithC1FWALCheckpoint(true))
c1fopts = append(c1fopts, WithC1FWALCheckpoint(options.enableWALCheckpoint))

That way if the default behavior ever changes, it's possible to disable WAL checkpointing.


// startWALCheckpointing starts a background goroutine to perform WAL checkpoints every 5 minutes.
func (c *C1File) startWALCheckpointing() {
c.checkpointTicker = time.NewTicker(5 * time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should WithWALCheckpointDuration(time.Duration) be the option instead of (or maybe in addition to) WithC1FWALCheckpoint(bool)? (If it's instead of, a duration of 0 would disable.) I'm guessing we'll want to tweak this duration based on what we learn from running it in prod.


// performWALCheckpoint performs a WAL checkpoint using SQLITE_CHECKPOINT_RESTART or SQLITE_CHECKPOINT_TRUNCATE.
func (c *C1File) performWALCheckpoint() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to add more configuration options, but this timeout also seems like a value we'd want to tweak based on what we learn from running parallel syncs. Maybe it's ok to add an environment variable to override the default value?

opts = append(opts, dotc1z.WithDecoderOptions(l.decoderOptions...))
}
if l.enableWALCheckpoint {
opts = append(opts, dotc1z.WithWALCheckpoint(true))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also have the if statement removed and just set it to true or false:

Suggested change
opts = append(opts, dotc1z.WithWALCheckpoint(true))
opts = append(opts, dotc1z.WithWALCheckpoint(l.enableWALCheckpoint))

chunkedRows := rows[start:end]

// Create the base insert dataset
err := executeChunkWithRetry(ctx, c, tableName, chunkedRows, buildQueryFn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this code inserted all the chunks in one big transaction. This change inserts each chunk as a transaction. I'm pretty sure we don't want that.

}

// executeChunkWithRetry executes a single chunk with retry logic for SQLITE_BUSY errors.
func executeChunkWithRetry(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need equivalent logic for the c1z backed session store?

)

// min returns the smaller of two integers.
func minInt(a, b int) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a builtin for this now.

Fix deadlock

Play around with concurrent resource tasks

A little log cleanup

More log cleanup

fix build

Add task with retry and backoff

default shouldFetchRelated to false

Check context when waiting to add task

Fix syntax error

fix logging

Fix deadlocks

Don't exit the sync early if we aren't making enough progress.

Call expand grants until it is complete

Wire up parallel sync config option to service mode sync tasks.

fix issue where only 10,000 entitlements are synced

add CollectEntitlementsAndGrantsTasksOp

set default worker count to 2

apply lock when updating entitlements progress during parallel sync

use exponential backoff in addTaskWithRetry

refactor so that AddTask* has minimal locking

fix entitlement and grant progress counts

we are tracking the number of resources processed, not the total number of entitlements and grants

lock when expanding queues

Use a transaction per chunk during bulk object put, also retry sqlite busy/dblocked errors.

WAL checkpoint on 5 minute interval

pause sqlite activity during checkpoint

manual WAL checkpoints only apply to parallel sync

update comment
update type for WithSessionStore

Don't check the bucket name when looking to see if tasks are complete. Check context when sleeping.

Check context when sleeping.

Lint fixes

More lint fixes.

Various review feedback fixes.
@ggreer ggreer force-pushed the jallers/jirwin/psync-deadlock-fix branch from bc22584 to 3399982 Compare December 4, 2025 19:18
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/tasks/c1api/full_sync.go (1)

41-90: skipFullSync is ignored when parallelSync is enabled.

skipFullSync is only applied as a WithSkipFullSync option to the underlying SequentialSyncer, whose Sync method short‑circuits to SkipSync. When parallelSync is true, you build a SequentialSyncer with skipFullSync set, then wrap it in a parallelSyncer and call parallelSyncer.Sync, which does not consult skipFullSync at all—so a “skip full sync” task will still run a full parallel sync.

If this combination is valid, you likely want to either:

  • Short‑circuit in this handler when skipFullSync && parallelSync (run baseSyncer.SkipSync(ctx) directly), or
  • Teach parallelSyncer.Sync to honor the underlying skipFullSync flag.

As written, behavior diverges from the sequential path for the same task flags.

Also applies to: 91-104

♻️ Duplicate comments (4)
pkg/sync/parallel_syncer.go (4)

36-43: Unreachable return in addTaskWithRetry; tighten control flow.

The for loop in addTaskWithRetry always returns from inside the loop, so the final return nil // This should never be reached is indeed unreachable and misleading. Consider replacing it with an explicit error return if the loop ever exits unexpectedly.

 func (ps *parallelSyncer) addTaskWithRetry(ctx context.Context, task *task, maxRetries int) error {
@@
-	for attempt := 0; attempt <= maxRetries; attempt++ {
+	for attempt := 0; attempt <= maxRetries; attempt++ {
 		err := ps.taskQueue.AddTask(ctx, task)
@@
-		}
-	}
-
-	return nil // This should never be reached
+		}
+	}
+
+	// Defensive: the loop should always return above.
+	return fmt.Errorf("addTaskWithRetry exited retry loop without enqueuing task")
 }

Also applies to: 44-72


117-169: Recursive immediate processing risks stack overflow on deep task trees.

processTaskImmediately recursively calls itself for both CollectEntitlementsAndGrantsTasksOp and SyncResourcesOp sub-tasks. On deeply nested resource hierarchies or long chains of follow‑on work, this fallback path can exhaust the goroutine stack—precisely when the queue is already under pressure.

Refactor this to use an explicit task stack/queue (iterative loop) and/or enforce a maximum depth, so immediate processing cannot recurse unboundedly.


379-397: Dynamic channel “resizing” in expandQueueAndRetry is unsafe and can strand tasks.

expandQueueAndRetry replaces the bucket channel in bucketQueues while other goroutines may still hold references to the old channel. Producers that already captured currentQueue (from prior getOrCreateBucketChannel calls) can continue sending to the old channel, but no consumer ever reads from it after the map is updated—leading to stuck sends and lost tasks. The len‑based drain loop also has a race and can exit while producers are still writing into the old queue.

Safer pattern here is to avoid resizing buffered channels entirely and simply treat a full queue as errTaskQueueFull, letting addTaskWithGuarantee fall back to longer timeouts or immediate processing.

 func (q *taskQueue) AddTaskWithTimeout(ctx context.Context, t *task, timeout time.Duration) error {
@@
 	// Try to add the task
 	select {
 	case queue <- t:
 		return nil
 	case <-time.After(timeout):
-		// Queue is full, try to expand it
-		return q.expandQueueAndRetry(bucket, t, timeout)
+		// Queue is full; let caller decide how to handle backpressure.
+		return errTaskQueueFull
 	case <-ctx.Done():
 		return ctx.Err()
 	}
 }
-
-// expandQueueAndRetry attempts to expand the queue and retry adding the task.
-func (q *taskQueue) expandQueueAndRetry(bucket string, t *task, timeout time.Duration) error {
-	// ... remove entire function ...
-}

Then delete expandQueueAndRetry entirely and adjust any remaining references accordingly.

Also applies to: 399-463


171-181: LocalStateContext can never enable ShouldFetchRelatedResources.

StateInterface exposes ShouldFetchRelatedResources() but no setter, and LocalStateContext only has a shouldFetchRelated field plus a getter. In the parallel path, all call sites (e.g., syncGrantsForResourceLogic) read this flag from a freshly created LocalStateContext that is never mutated, so shouldFetchRelated is always false and the “fetch related resources” logic is effectively disabled.

If parallel sync is expected to support targeted/partial syncs that should fetch related resources (as the sequential syncer already does), you’ll need to mirror the sequential state API:

  • Add SetShouldFetchRelatedResources() to StateInterface.
  • Implement it on LocalStateContext (set shouldFetchRelated = true).
  • Ensure the initiating code sets this flag when running targeted/partial parallel syncs.

Also applies to: 193-201, 235-237, 1719-1870

🧹 Nitpick comments (3)
pkg/field/defaults.go (1)

91-98: Parallel sync field wiring looks correct; consider minor doc/order tweaks

The new parallelSync BoolField and its inclusion in DefaultFields look consistent with the existing pattern (persistent, ExportTargetNone, defaulting to false). No functional issues here.

Two small, optional nits you may want to consider:

  • The description "This must be set to enable parallel sync" could be slightly more explicit, e.g. “Enable parallel synchronization of resources, entitlements, and grants,” to better match the feature semantics.
  • In DefaultFields, parallelSync is listed far from the other sync knobs (skipFullSync, sync-resources, sync-resource-types, etc.). If the order impacts CLI help readability, you might want to group it with those fields instead of near ServerSessionStoreMaximumSizeField.

The current behavior is fine as-is; these are just UX/clarity improvements. Based on learnings, WAL checkpointing is still enforced internally by the parallel syncer, so it’s good that this flag does not try to expose or control that separately.

Also applies to: 257-307

pkg/sync/parallel_syncer.go (1)

822-830: Guard workers slice mutations if GetWorkerStatus is called concurrently.

parallelSyncer has an mu RWMutex and uses RLock in areWorkersIdle and GetWorkerStatus, but startWorkers and stopWorkers mutate ps.workers without taking the write lock. If GetWorkerStatus (or areWorkersIdle) is ever called from another goroutine while workers are being started or stopped, this will be a data race.

Consider wrapping ps.workers assignments and iteration in startWorkers/stopWorkers with ps.mu.Lock/Unlock to match the read‑side locking.

Also applies to: 961-972, 1960-1973

pkg/tasks/local/syncer.go (1)

108-131: Parallel syncer wrapping and worker config look correct; worker count is a tunable policy

The refactor to introduce baseSyncer plus a syncer interface and then conditionally wrap with:

config := sdkSync.DefaultParallelSyncConfig().WithWorkerCount(10)
syncer = sdkSync.NewParallelSyncer(baseSyncer, config)

is functionally sound:

  • Error handling and Close semantics are preserved around the final syncer.
  • The builder call now correctly uses the returned config value.
  • Sequential behavior remains unchanged when m.parallelSync is false.

If you expect different connectors or environments to want different concurrency levels, consider threading the worker count from configuration/flags into this layer rather than hard‑coding 10; otherwise this is fine as a default. Based on learnings, WAL checkpointing is handled internally by NewParallelSyncer, so no extra knob is needed here.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc22584 and 3399982.

⛔ Files ignored due to path filters (2)
  • pb/c1/connector/v2/resource.pb.go is excluded by !**/*.pb.go
  • pb/c1/connector/v2/resource_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (16)
  • pb/c1/connector/v2/resource.pb.validate.go (1 hunks)
  • pkg/cli/commands.go (1 hunks)
  • pkg/connectorrunner/runner.go (5 hunks)
  • pkg/dotc1z/c1file.go (10 hunks)
  • pkg/dotc1z/manager/local/local.go (3 hunks)
  • pkg/dotc1z/manager/manager.go (4 hunks)
  • pkg/dotc1z/manager/s3/s3.go (3 hunks)
  • pkg/dotc1z/sql_helpers.go (4 hunks)
  • pkg/field/defaults.go (2 hunks)
  • pkg/sync/parallel_syncer.go (1 hunks)
  • pkg/sync/state.go (3 hunks)
  • pkg/sync/syncer.go (56 hunks)
  • pkg/tasks/c1api/full_sync.go (4 hunks)
  • pkg/tasks/c1api/manager.go (4 hunks)
  • pkg/tasks/local/syncer.go (4 hunks)
  • proto/c1/connector/v2/resource.proto (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • pb/c1/connector/v2/resource.pb.validate.go
  • pkg/tasks/c1api/manager.go
  • pkg/cli/commands.go
  • proto/c1/connector/v2/resource.proto
  • pkg/sync/state.go
  • pkg/dotc1z/manager/s3/s3.go
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.
📚 Learning: 2025-10-21T23:58:13.768Z
Learnt from: jirwin
Repo: ConductorOne/baton-sdk PR: 524
File: pkg/sync/parallel_syncer.go:827-829
Timestamp: 2025-10-21T23:58:13.768Z
Learning: In the parallel syncer implementation (pkg/sync/parallel_syncer.go), WAL checkpointing must always be enabled for parallel sync operations. The `NewParallelSyncer` function unconditionally sets `baseSyncer.enableWALCheckpoint = true` because high concurrency in parallel sync requires WAL checkpointing to prevent checkpoint failures. This is a design requirement, not a configurable option.

Applied to files:

  • pkg/tasks/c1api/full_sync.go
  • pkg/tasks/local/syncer.go
  • pkg/dotc1z/manager/local/local.go
  • pkg/dotc1z/c1file.go
  • pkg/field/defaults.go
  • pkg/sync/parallel_syncer.go
  • pkg/dotc1z/manager/manager.go
  • pkg/connectorrunner/runner.go
  • pkg/sync/syncer.go
📚 Learning: 2024-09-03T15:49:24.881Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:49:24.881Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `cleanup` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.

Applied to files:

  • pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-10-08T21:29:30.695Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-10-08T21:29:30.695Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `DBCache` methods in the `pkg/uhttp/dbcache.go` file.

Applied to files:

  • pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-09-03T15:53:47.572Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:53:47.572Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `NewDBCache` function of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.

Applied to files:

  • pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-09-03T15:51:48.712Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-09-03T15:51:48.712Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `Get` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.

Applied to files:

  • pkg/dotc1z/sql_helpers.go
📚 Learning: 2024-10-08T21:29:30.695Z
Learnt from: mchavez
Repo: ConductorOne/baton-sdk PR: 211
File: pkg/uhttp/dbcache.go:0-0
Timestamp: 2024-10-08T21:29:30.695Z
Learning: The user mchavez has fixed the issue related to improving error messages and adding comments in the `Has` method of the `DBCache` struct in the `pkg/uhttp/dbcache.go` file.

Applied to files:

  • pkg/dotc1z/sql_helpers.go
🧬 Code graph analysis (8)
pkg/tasks/c1api/full_sync.go (3)
pkg/sync/syncer.go (2)
  • Syncer (54-57)
  • NewSyncer (3297-3323)
pkg/types/types.go (1)
  • ConnectorClient (30-46)
pkg/sync/parallel_syncer.go (2)
  • DefaultParallelSyncConfig (250-255)
  • NewParallelSyncer (833-845)
pkg/dotc1z/sql_helpers.go (2)
pkg/dotc1z/c1file.go (1)
  • C1File (39-63)
vendor/github.com/doug-martin/goqu/v9/insert_dataset.go (2)
  • InsertDataset (12-18)
  • Insert (33-35)
pkg/dotc1z/manager/local/local.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/c1file.go (1)
  • WithWALCheckpoint (154-158)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (2)
  • WithWALCheckpoint (44-48)
  • Option (30-30)
pkg/dotc1z/c1file.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/manager.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/field/defaults.go (3)
pb/c1/config/v1/config.pb.go (2)
  • BoolField (1048-1054)
  • BoolField (1067-1067)
pkg/field/fields.go (1)
  • BoolField (179-199)
pkg/field/field_options.go (4)
  • WithDescription (47-53)
  • WithPersistent (129-135)
  • WithExportTarget (103-111)
  • ExportTargetNone (97-97)
pkg/sync/parallel_syncer.go (3)
pkg/sync/state.go (1)
  • Action (144-151)
pb/c1/connector/v2/resource_protoopaque.pb.go (10)
  • ResourceId (2626-2633)
  • ResourceId (2646-2646)
  • ResourceType (126-137)
  • ResourceType (150-150)
  • ResourceTypesServiceListResourceTypesRequest (273-282)
  • ResourceTypesServiceListResourceTypesRequest (295-295)
  • ResourcesServiceListResourcesRequest (2901-2911)
  • ResourcesServiceListResourcesRequest (2924-2924)
  • Resource (2711-2723)
  • Resource (2736-2736)
pb/c1/connector/v2/resource.pb.go (10)
  • ResourceId (2652-2659)
  • ResourceId (2672-2672)
  • ResourceType (126-141)
  • ResourceType (154-154)
  • ResourceTypesServiceListResourceTypesRequest (275-284)
  • ResourceTypesServiceListResourceTypesRequest (297-297)
  • ResourcesServiceListResourcesRequest (2925-2935)
  • ResourcesServiceListResourcesRequest (2948-2948)
  • Resource (2737-2749)
  • Resource (2762-2762)
pkg/dotc1z/manager/manager.go (4)
pkg/dotc1z/decoder.go (1)
  • DecoderOption (57-57)
pkg/dotc1z/c1file.go (1)
  • WithWALCheckpoint (154-158)
pkg/dotc1z/manager/local/local.go (1)
  • WithWALCheckpoint (40-44)
pkg/dotc1z/manager/s3/s3.go (1)
  • WithWALCheckpoint (44-48)
pkg/connectorrunner/runner.go (2)
pkg/tasks/local/syncer.go (2)
  • WithParallelSyncEnabled (75-79)
  • Option (31-31)
pkg/tasks/c1api/manager.go (1)
  • NewC1TaskManager (303-330)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (11)
pkg/dotc1z/manager/local/local.go (1)

18-24: Local WAL checkpoint option wiring looks correct.

enableWALCheckpoint is plumbed via WithWALCheckpoint and conditionally passed as dotc1z.WithWALCheckpoint(true) alongside journal_mode=WAL, which is consistent with the new C1Z/C1File options and doesn’t introduce correctness issues.

Also applies to: 40-44, 115-125

pkg/dotc1z/manager/manager.go (1)

20-24: Manager-level WAL checkpoint flag is plumbed cleanly.

WithWALCheckpoint on managerOptions and the conditional propagation to S3 and local options are straightforward and backward compatible; they align with the new C1Z/C1File WAL options.

Also applies to: 40-44, 57-82

pkg/dotc1z/sql_helpers.go (1)

229-232: Checkpoint locking and SQLite BUSY retry logic look correct.

listConnectorObjects and executeChunkWithRetry now acquire the checkpoint read lock around DB activity, which correctly coordinates with the WAL checkpoint writer lock. The per‑chunk transaction flow rolls back and releases the lock on all error paths, retries only on SQLITE_BUSY/“database is locked”, and uses upsert-style inserts so replays on retry are safe.

Also applies to: 314-317, 318-346, 348-437

pkg/dotc1z/c1file.go (1)

39-63: WAL checkpointing design and lifecycle are sound.

The new checkpointEnabled flag, options (WithC1FWALCheckpoint, WithWALCheckpoint), and init/Close wiring form a coherent WAL checkpointing story: checkpoints only start when enabled and the DB is truly in WAL mode, DB operations use acquireCheckpointLock/releaseCheckpointLock as readers, and performWALCheckpoint takes the write lock with bounded timeout and proper logging. Close cleanly stops the ticker, signals the goroutine, waits for checkpointDone, and only then closes the DB, which avoids background work on a closed handle.

Also applies to: 81-86, 99-109, 128-133, 154-159, 160-191, 201-213, 235-270, 459-524

pkg/sync/parallel_syncer.go (1)

832-845: WAL checkpoint unconditionally enabled for parallel syncer (matches design requirement).

NewParallelSyncer forces baseSyncer.enableWALCheckpoint = true before constructing the parallel syncer, ensuring WAL checkpointing is always on under high concurrency. This matches the documented design requirement for parallel sync and keeps the configuration non‑optional in this mode.

Based on learnings, parallel sync is required to enable WAL checkpointing to avoid checkpoint failures under heavy concurrency.

pkg/sync/syncer.go (2)

59-83: ProgressCounts locking and sequential/parallel split look correct.

Introducing mu and sequentialMode, plus the Add* helpers, cleanly separates the single‑threaded sequential case (no locking) from the parallel case (mutex‑protected). Logging methods correctly branch on sequentialMode and only take locks when needed; the direct map increments in the sequential paths (SyncResourceTypes, SyncResources, SyncEntitlementsForResource, SyncGrantsForResource) are guarded by sequentialMode checks or only used when sequentialMode is true, so they’re safe.

Also applies to: 86-178, 181-255, 257-277, 279-312, 1399-1410, 2145-2147


314-344: SequentialSyncer’s WAL checkpoint flag is wired into the manager as intended.

Adding enableWALCheckpoint to SequentialSyncer and using it in loadStore to append manager.WithWALCheckpoint(true) ensures that when parallel sync sets this flag, the C1Z manager (and ultimately C1File) is created with WAL checkpointing enabled. Sequential callers that never touch the flag retain the previous behavior.

Based on learnings, this is the right place to plumb the “always enable WAL checkpointing for parallel sync” requirement down into the storage layer.

Also applies to: 3099-3131

pkg/tasks/c1api/full_sync.go (1)

30-39: Parallel sync wiring from task handler config into Syncer is clear.

Propagating parallelSync into fullSyncTaskHandler, constructing a base SequentialSyncer with the existing options, and then conditionally wrapping it in a parallelSyncer (with a reasonable default worker count) is a clean way to switch between sequential and parallel paths without changing the task surface.

Also applies to: 198-218

pkg/tasks/local/syncer.go (1)

18-29: Parallel sync flag and local option are wired coherently

Adding parallelSync to localSyncer and exposing it via WithParallelSyncEnabled(parallel bool) matches the existing option pattern (other booleans on the struct) and keeps the default behavior sequential when the option isn’t used. I don’t see correctness risks here.

Also applies to: 75-79

pkg/connectorrunner/runner.go (2)

318-352: runnerConfig.parallelSync and WithParallelSyncEnabled integrate cleanly

The new parallelSync field on runnerConfig plus the parameterless WithParallelSyncEnabled() option fit well with the existing option style (boolean feature toggles). Defaults stay sequential, and enabling parallel sync is explicit and opt‑in. No issues here.

Also applies to: 556-561


806-815: End-to-end parallelSync plumbing into local and C1 task managers is consistent

The additions:

  • Passing local.WithParallelSyncEnabled(cfg.parallelSync) into local.NewSyncer for on-demand sync, and
  • Extending c1api.NewC1TaskManager(...) to accept and receive cfg.parallelSync

ensure that the same flag controls parallel behavior in both local and C1-managed sync paths without changing behavior when the flag is unset. This matches the PR’s intent and aligns with the local syncer’s new option. Based on learnings, this also correctly routes calls through NewParallelSyncer where WAL checkpointing is enforced when parallel mode is enabled.

Also applies to: 827-838

@ggreer ggreer mentioned this pull request Dec 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants