Skip to content

Conversation

@arreyder
Copy link
Contributor

@arreyder arreyder commented Jan 6, 2026

Summary

  • Add sync.Pool for zstd.Encoder and zstd.Decoder instances to reduce allocation overhead
  • Profiling showed zstd.ensureHist allocating 215 MB/min in temporal_sync due to creating new encoders per saveC1z call
  • Pool only used when encoder/decoder options match defaults (safe fallback to new instances otherwise)

Changes

File Change
pkg/dotc1z/pool.go NEW - encoder/decoder pool with get/put functions
pkg/dotc1z/pool_test.go NEW - unit tests and benchmarks
pkg/dotc1z/file.go Modified saveC1z to use pooled encoders
pkg/dotc1z/decoder.go Modified decoder to use pooled decoders

Safety Measures

  • Pool only used when options match pool defaults (concurrency, maxMemory)
  • Encoders always closed on error paths (not returned to pool in bad state)
  • Reset(nil) called before returning to pool to release writer/reader references
  • Decoder Reset() errors handled gracefully (don't return bad decoders to pool)

Benchmark Results

BenchmarkEncoderAllocationOnly/pooled-16         	 1766419	       682 ns/op	     112 B/op	       2 allocs/op
BenchmarkEncoderAllocationOnly/new_each_time-16  	    2296	    604193 ns/op	 2328009 B/op	      30 allocs/op

20,785x reduction in bytes allocated per encode operation.

Test plan

  • All existing dotc1z tests pass
  • New pool unit tests (concurrent access, round-trip correctness)
  • Benchmark demonstrates allocation reduction
  • Deploy to staging and profile to verify improvement

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Chores

    • Improved compression/decompression performance and reduced memory allocations via internal pooling, leading to faster C1Z read/write and lower resource use.
    • Enhanced reliability of encoder/decoder lifecycle to avoid resource leaks on error paths.
  • Tests

    • Added comprehensive tests and benchmarks covering pooled operations, concurrent access, and end-to-end round-trip integrity.

✏️ Tip: You can customize this high-level summary in your review settings.

Add sync.Pool for zstd.Encoder and zstd.Decoder instances to reduce
allocation overhead during c1z file operations. Profiling showed
zstd.ensureHist allocating 215 MB/min in temporal_sync due to creating
new encoders per saveC1z call.

Changes:
- Add pool.go with encoder/decoder pool management
- Modify saveC1z to use pooled encoders when concurrency matches default
- Modify decoder to use pooled decoders when options match defaults
- Add comprehensive tests and benchmarks

Safety measures:
- Pool only used when options match pool defaults
- Encoders always closed on error (not returned to pool in bad state)
- Reset(nil) called before returning to pool to release references
- Decoder Reset errors handled gracefully

Benchmark results show 20,785x reduction in bytes allocated per encode:
- Pooled:        112 B/op,   2 allocs
- New each time: 2,328,009 B/op, 30 allocs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Jan 6, 2026

Walkthrough

Adds sync.Pool-based pooling for zstd encoders and decoders, integrates pooled usage into decoder creation and C1Z file writing paths with fallbacks on Reset/create failures, and adds tests/benchmarks exercising pooled behavior and concurrency.

Changes

Cohort / File(s) Summary
Pool implementation
pkg/dotc1z/pool.go
New file adding unexported encoder/decoder pools (sync.Pool), helpers getEncoder/putEncoder and getDecoder/putDecoder, and default concurrency/memory settings for pooled objects.
Decoder integration
pkg/dotc1z/decoder.go
Adds fromPool flag to decoder struct; attempts to obtain and Reset a pooled decoder when using default options, falls back to creating a new decoder if Reset fails, and returns pooled decoders to the pool in Close.
Encoder / file writer integration
pkg/dotc1z/file.go
Attempts to obtain/reset a pooled encoder when encoder concurrency matches pool default; ensures pooled encoders are returned to the pool after successful use and closed on error paths; adjusts sequencing around file sync/close and pool return.
Tests & benchmarks
pkg/dotc1z/pool_test.go
New comprehensive tests covering encoder/decoder pooling, concurrency stress tests, round-trip encode/decode validation, and allocation/behavior benchmarks.

Sequence Diagram(s)

sequenceDiagram
  participant Writer as C1Z Writer
  participant Pool as encoder/decoder pool
  participant ZSTD as zstd Encoder/Decoder
  participant File as Output/Input File

  Note over Writer,Pool: Encoder path (write)
  Writer->>Pool: getEncoder()
  alt pooled encoder available
    Pool-->>Writer: pooled Encoder
    Writer->>ZSTD: Reset(writable File)
    alt Reset OK
      ZSTD-->>Writer: ready
      Writer->>File: write compressed data
      Writer->>ZSTD: Close/Flush
      ZSTD-->>Writer: closed
      Writer->>Pool: putEncoder(encoder)
    else Reset fails
      ZSTD-->>Pool: putEncoder? (closed)
      Pool-->>Writer: none -> fallback create
    end
  else no pooled encoder
    Pool-->>Writer: none -> create new Encoder
  end
Loading
sequenceDiagram
  participant Reader as C1Z Reader
  participant Pool as encoder/decoder pool
  participant ZSTD as zstd Decoder
  participant File as Input File

  Note over Reader,Pool: Decoder path (read)
  Reader->>Pool: getDecoder()
  alt pooled decoder available
    Pool-->>Reader: pooled Decoder
    Reader->>ZSTD: Reset(readable File)
    alt Reset OK
      ZSTD-->>Reader: ready
      Reader->>File: read compressed data
      Reader->>ZSTD: Close
      ZSTD-->>Reader: closed
      Reader->>Pool: putDecoder(decoder)
    else Reset fails
      ZSTD-->>Pool: drop
      Pool-->>Reader: none -> fallback create
    end
  else no pooled decoder
    Pool-->>Reader: none -> create new Decoder
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I pooled a hat of zstd and hopped,
Encoders zipped and decoders popped,
Reused with care, returned with cheer,
A nimble rabbit engineers near! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 63.64% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately describes the main change: introducing object pooling for zstd encoders and decoders to reduce allocation overhead, which aligns with the core objective and all modifications.
✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
pkg/dotc1z/pool_test.go (4)

67-88: Using require in goroutines can cause test hangs on failure.

When require fails inside a goroutine, it calls t.FailNow() which only terminates the current goroutine, not the test. The main goroutine's wg.Wait() may hang indefinitely if other goroutines don't complete.

Consider using assert instead and checking for failures after wg.Wait(), or use a channel/atomic to signal failures.

🔎 Proposed fix using assert
 	t.Run("concurrent pool access", func(t *testing.T) {
 		const numGoroutines = 10
 		const iterations = 100

 		var wg sync.WaitGroup
+		var failed atomic.Bool
 		wg.Add(numGoroutines)

 		for i := 0; i < numGoroutines; i++ {
 			go func(id int) {
 				defer wg.Done()
 				for j := 0; j < iterations; j++ {
 					enc, _ := getEncoder()
-					require.NotNil(t, enc)
+					if enc == nil {
+						failed.Store(true)
+						return
+					}

 					var buf bytes.Buffer
 					enc.Reset(&buf)

 					data := []byte("concurrent test data")
 					_, err := enc.Write(data)
-					require.NoError(t, err)
-					require.NoError(t, enc.Close())
+					if err != nil {
+						failed.Store(true)
+						return
+					}
+					if err := enc.Close(); err != nil {
+						failed.Store(true)
+						return
+					}

 					putEncoder(enc)
 				}
 			}(i)
 		}

 		wg.Wait()
+		require.False(t, failed.Load(), "concurrent access failed")
 	})

141-161: Same issue: require in goroutines for decoder concurrent test.

Same concern as the encoder concurrent test - using require inside goroutines can cause the test to hang if a failure occurs.


223-224: Add period at end of comment to satisfy godot linter.

🔎 Proposed fix
 // BenchmarkEncoderPoolAllocs measures allocations with and without pooling.
-// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem
+// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem.
 func BenchmarkEncoderPoolAllocs(b *testing.B) {

93-99: Helper function ignores errors silently.

The createCompressedData helper ignores all errors. While acceptable for test helpers with known-good inputs, consider adding a t parameter to fail fast on unexpected errors.

🔎 Proposed fix
-	createCompressedData := func(data []byte) []byte {
+	createCompressedData := func(t *testing.T, data []byte) []byte {
+		t.Helper()
 		var buf bytes.Buffer
-		enc, _ := zstd.NewWriter(&buf)
-		_, _ = enc.Write(data)
-		_ = enc.Close()
+		enc, err := zstd.NewWriter(&buf)
+		require.NoError(t, err)
+		_, err = enc.Write(data)
+		require.NoError(t, err)
+		require.NoError(t, enc.Close())
 		return buf.Bytes()
 	}
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd227b2 and 7ab440b.

📒 Files selected for processing (4)
  • pkg/dotc1z/decoder.go
  • pkg/dotc1z/file.go
  • pkg/dotc1z/pool.go
  • pkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.

Applied to files:

  • pkg/dotc1z/pool_test.go
🧬 Code graph analysis (3)
pkg/dotc1z/file.go (2)
pkg/dotc1z/c1file.go (1)
  • WithEncoderConcurrency (173-177)
vendor/google.golang.org/grpc/rpc_util.go (1)
  • Errorf (973-975)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (3)
  • NewDecoder (234-267)
  • WithDecoderConcurrency (111-116)
  • WithDecoderMaxMemory (78-89)
pkg/dotc1z/c1file.go (1)
  • WithEncoderConcurrency (173-177)
pkg/dotc1z/pool.go (3)
pkg/dotc1z/c1file.go (1)
  • WithEncoderConcurrency (173-177)
vendor/github.com/klauspost/compress/zstd/decoder.go (2)
  • Decoder (22-47)
  • NewReader (87-123)
pkg/dotc1z/decoder.go (2)
  • WithDecoderConcurrency (111-116)
  • WithDecoderMaxMemory (78-89)
🪛 GitHub Check: go-lint
pkg/dotc1z/pool_test.go

[failure] 224-224:
Comment should end in a period (godot)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: go-test (1.25.2, windows-latest)
  • GitHub Check: go-test (1.25.2, ubuntu-latest)
🔇 Additional comments (12)
pkg/dotc1z/decoder.go (2)

145-160: Pool integration logic looks correct.

The pool usage condition properly checks that options match pool defaults before attempting reuse. The fallback path on Reset failure correctly returns the decoder to the pool and creates a new one.

One minor observation: if getDecoder() returns nil (when ok is false because pool creation failed), the code correctly falls through to create a new decoder via the existing path.


220-231: Close() correctly distinguishes pooled vs non-pooled decoders.

The logic properly returns pooled decoders for reuse and closes non-pooled ones. Setting d.zd = nil after handling prevents double-free issues.

pkg/dotc1z/file.go (3)

179-182: Returning a closed encoder to the pool is correct behavior.

After reviewing the zstd library, calling Reset(nil) on a closed encoder is safe and documented as the proper way to prepare it for pooling. The encoder can be reused after Reset() is called with a new writer.


148-149: Ignoring Reset return value is intentional for encoders.

Unlike zstd.Decoder.Reset() which returns an error, zstd.Encoder.Reset() has no return value in the klauspost/compress library, so this is correct.


163-166: Error path correctly closes encoder without returning to pool.

On copy error, the encoder is closed to release resources but not returned to the pool since it may be in an inconsistent state. The underscore assignment for Close() error is acceptable here since we're already returning the original error.

pkg/dotc1z/pool_test.go (1)

164-221: Good round-trip tests validating end-to-end correctness.

The tests properly exercise both single and multiple round-trips, verifying data integrity. The use of t.TempDir() ensures cleanup. Based on learnings, the cleanup is automatically handled by the testing framework here.

pkg/dotc1z/pool.go (6)

21-36: Encoder pool get/create logic is well-structured.

The type assertion with nil check handles both empty pool and type mismatch. Creating with nil writer is correct since Reset() will be called before use.


41-49: putEncoder correctly resets before pooling.

Calling Reset(nil) releases the reference to the previous writer, preventing memory leaks. The nil check prevents panics.


58-74: Decoder pool creation matches decoder.go defaults.

The options (concurrency=1, lowmem=true, maxMemory=defaultDecoderMaxMemory) align with the pool eligibility check in decoder.go lines 147.


78-88: putDecoder correctly handles Reset failure.

Unlike putEncoder, decoders can fail on Reset(nil) if in a bad state. Not returning failed decoders to the pool prevents reuse of corrupted instances.


1-88: Overall pool implementation is solid.

The design correctly:

  • Uses sync.Pool for thread-safe pooling
  • Matches pool eligibility to specific option configurations
  • Handles creation failures gracefully
  • Cleans up references before returning to pool
  • Prevents corrupted instances from being pooled

The ~20,785× reduction in bytes allocated per encode operation (per PR benchmarks) demonstrates significant improvement.


14-16: pooledEncoderConcurrency is evaluated once at package init.

This captures GOMAXPROCS at program startup. If GOMAXPROCS is changed at runtime (e.g., via runtime.GOMAXPROCS(n)), the pool won't match and encoders will be created fresh instead of being pulled from the pool. This is the intended behavior—the code compares the requested encoder concurrency against the pooled default and only reuses pooled encoders when they match.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI Agents
In @pkg/dotc1z/pool_test.go:
- Around line 193-220: The test currently calls decoder.Close() and f.Close()
without checking errors; update the cleanup to capture and assert their return
values (e.g., err := decoder.Close(); require.NoError(t, err) and err =
f.Close(); require.NoError(t, err)) after using NewDecoder/decoder and os.Open/f
so cleanup failures fail the test and surface pool/resource issues.
🧹 Nitpick comments (2)
pkg/dotc1z/pool_test.go (2)

93-99: Consider handling errors in the test helper for clarity.

The helper silently ignores errors from enc.Write() and enc.Close(). While failures would likely cause downstream test failures during decompression, explicit error handling (e.g., using require.NoError) would make test failures clearer and fail fast if encoder setup goes wrong.

🔎 Proposed fix for explicit error handling
 createCompressedData := func(data []byte) []byte {
   var buf bytes.Buffer
-  enc, _ := zstd.NewWriter(&buf)
-  _, _ = enc.Write(data)
-  _ = enc.Close()
+  enc, err := zstd.NewWriter(&buf)
+  require.NoError(t, err)
+  _, err = enc.Write(data)
+  require.NoError(t, err)
+  err = enc.Close()
+  require.NoError(t, err)
   return buf.Bytes()
 }

244-264: Consider basic error validation in benchmarks for robustness.

The "new_encoder_each_time" sub-benchmark ignores all error returns (file operations, writes, Close calls). While this is common in benchmarks to minimize overhead, at minimum checking file operation errors (lines 249-250) with if err != nil { b.Fatal(err) } would ensure the benchmark is measuring valid operations.

📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ab440b and 164e413.

📒 Files selected for processing (1)
  • pkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.

Applied to files:

  • pkg/dotc1z/pool_test.go
🧬 Code graph analysis (1)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (4)
  • NewDecoder (234-267)
  • C1ZFileHeader (23-23)
  • WithDecoderConcurrency (111-116)
  • WithDecoderMaxMemory (78-89)
pkg/dotc1z/c1file.go (1)
  • WithEncoderConcurrency (173-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (4)
pkg/dotc1z/pool_test.go (4)

16-89: LGTM! Comprehensive encoder pool testing.

The test coverage is thorough:

  • Pool behavior correctly verified (cache miss on first call, hit on subsequent calls)
  • Compression correctness validated via round-trip decompression
  • Concurrent access properly tested with multiple goroutines

101-161: LGTM! Thorough decoder pool testing.

The decoder pool tests mirror the encoder tests well:

  • Pool behavior validated (first get from empty pool, second from populated pool)
  • Decompression correctness verified
  • Concurrent access properly exercised

267-299: LGTM! Well-designed allocation benchmark.

The benchmark correctly:

  • Warms up the pool before measurement
  • Resets the timer to exclude setup overhead
  • Isolates pure allocation costs by using bytes.Buffer instead of file I/O
  • Provides a clear comparison for the pooling benefit

301-346: LGTM! Decoder benchmark properly configured.

The benchmark provides a fair comparison by ensuring the "new_decoder_each_time" path uses the same decoder options (concurrency=1, low memory, max memory) that the pool would use. This ensures accurate measurement of pooling benefits.

Comment on lines +193 to +220
t.Run("multiple round trips reuse pool", func(t *testing.T) {
tmpDir := t.TempDir()

for i := 0; i < 10; i++ {
testData := bytes.Repeat([]byte("iteration data "), 100*(i+1))

dbFile := filepath.Join(tmpDir, "test.db")
err := os.WriteFile(dbFile, testData, 0600)
require.NoError(t, err)

c1zFile := filepath.Join(tmpDir, "test.c1z")
err = saveC1z(dbFile, c1zFile, 0)
require.NoError(t, err)

f, err := os.Open(c1zFile)
require.NoError(t, err)

decoder, err := NewDecoder(f)
require.NoError(t, err)

decoded, err := io.ReadAll(decoder)
require.NoError(t, err)
require.Equal(t, testData, decoded)

decoder.Close()
f.Close()
}
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert cleanup errors to verify proper resource cleanup.

Lines 217-218 call decoder.Close() and f.Close() without checking their return values. Per testing best practices, cleanup operations should be asserted to ensure they succeed, especially when testing resource pooling behavior where improper cleanup could mask pool state issues.

Based on learnings, cleanup failures should be treated as test failures.

🔎 Proposed fix to assert cleanup calls
       decoded, err := io.ReadAll(decoder)
       require.NoError(t, err)
       require.Equal(t, testData, decoded)
 
-      decoder.Close()
-      f.Close()
+      require.NoError(t, decoder.Close())
+      require.NoError(t, f.Close())
     }
   })
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
t.Run("multiple round trips reuse pool", func(t *testing.T) {
tmpDir := t.TempDir()
for i := 0; i < 10; i++ {
testData := bytes.Repeat([]byte("iteration data "), 100*(i+1))
dbFile := filepath.Join(tmpDir, "test.db")
err := os.WriteFile(dbFile, testData, 0600)
require.NoError(t, err)
c1zFile := filepath.Join(tmpDir, "test.c1z")
err = saveC1z(dbFile, c1zFile, 0)
require.NoError(t, err)
f, err := os.Open(c1zFile)
require.NoError(t, err)
decoder, err := NewDecoder(f)
require.NoError(t, err)
decoded, err := io.ReadAll(decoder)
require.NoError(t, err)
require.Equal(t, testData, decoded)
decoder.Close()
f.Close()
}
})
t.Run("multiple round trips reuse pool", func(t *testing.T) {
tmpDir := t.TempDir()
for i := 0; i < 10; i++ {
testData := bytes.Repeat([]byte("iteration data "), 100*(i+1))
dbFile := filepath.Join(tmpDir, "test.db")
err := os.WriteFile(dbFile, testData, 0600)
require.NoError(t, err)
c1zFile := filepath.Join(tmpDir, "test.c1z")
err = saveC1z(dbFile, c1zFile, 0)
require.NoError(t, err)
f, err := os.Open(c1zFile)
require.NoError(t, err)
decoder, err := NewDecoder(f)
require.NoError(t, err)
decoded, err := io.ReadAll(decoder)
require.NoError(t, err)
require.Equal(t, testData, decoded)
require.NoError(t, decoder.Close())
require.NoError(t, f.Close())
}
})
🤖 Prompt for AI Agents
In @pkg/dotc1z/pool_test.go around lines 193 - 220, The test currently calls
decoder.Close() and f.Close() without checking errors; update the cleanup to
capture and assert their return values (e.g., err := decoder.Close();
require.NoError(t, err) and err = f.Close(); require.NoError(t, err)) after
using NewDecoder/decoder and os.Open/f so cleanup failures fail the test and
surface pool/resource issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants