-
Notifications
You must be signed in to change notification settings - Fork 4
perf(dotc1z): pool zstd encoders and decoders to reduce allocations #622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add sync.Pool for zstd.Encoder and zstd.Decoder instances to reduce allocation overhead during c1z file operations. Profiling showed zstd.ensureHist allocating 215 MB/min in temporal_sync due to creating new encoders per saveC1z call. Changes: - Add pool.go with encoder/decoder pool management - Modify saveC1z to use pooled encoders when concurrency matches default - Modify decoder to use pooled decoders when options match defaults - Add comprehensive tests and benchmarks Safety measures: - Pool only used when options match pool defaults - Encoders always closed on error (not returned to pool in bad state) - Reset(nil) called before returning to pool to release references - Decoder Reset errors handled gracefully Benchmark results show 20,785x reduction in bytes allocated per encode: - Pooled: 112 B/op, 2 allocs - New each time: 2,328,009 B/op, 30 allocs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughAdds sync.Pool-based pooling for zstd encoders and decoders, integrates pooled usage into decoder creation and C1Z file writing paths with fallbacks on Reset/create failures, and adds tests/benchmarks exercising pooled behavior and concurrency. Changes
Sequence Diagram(s)sequenceDiagram
participant Writer as C1Z Writer
participant Pool as encoder/decoder pool
participant ZSTD as zstd Encoder/Decoder
participant File as Output/Input File
Note over Writer,Pool: Encoder path (write)
Writer->>Pool: getEncoder()
alt pooled encoder available
Pool-->>Writer: pooled Encoder
Writer->>ZSTD: Reset(writable File)
alt Reset OK
ZSTD-->>Writer: ready
Writer->>File: write compressed data
Writer->>ZSTD: Close/Flush
ZSTD-->>Writer: closed
Writer->>Pool: putEncoder(encoder)
else Reset fails
ZSTD-->>Pool: putEncoder? (closed)
Pool-->>Writer: none -> fallback create
end
else no pooled encoder
Pool-->>Writer: none -> create new Encoder
end
sequenceDiagram
participant Reader as C1Z Reader
participant Pool as encoder/decoder pool
participant ZSTD as zstd Decoder
participant File as Input File
Note over Reader,Pool: Decoder path (read)
Reader->>Pool: getDecoder()
alt pooled decoder available
Pool-->>Reader: pooled Decoder
Reader->>ZSTD: Reset(readable File)
alt Reset OK
ZSTD-->>Reader: ready
Reader->>File: read compressed data
Reader->>ZSTD: Close
ZSTD-->>Reader: closed
Reader->>Pool: putDecoder(decoder)
else Reset fails
ZSTD-->>Pool: drop
Pool-->>Reader: none -> fallback create
end
else no pooled decoder
Pool-->>Reader: none -> create new Decoder
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
pkg/dotc1z/pool_test.go (4)
67-88: Usingrequirein goroutines can cause test hangs on failure.When
requirefails inside a goroutine, it callst.FailNow()which only terminates the current goroutine, not the test. The main goroutine'swg.Wait()may hang indefinitely if other goroutines don't complete.Consider using
assertinstead and checking for failures afterwg.Wait(), or use a channel/atomic to signal failures.🔎 Proposed fix using assert
t.Run("concurrent pool access", func(t *testing.T) { const numGoroutines = 10 const iterations = 100 var wg sync.WaitGroup + var failed atomic.Bool wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() for j := 0; j < iterations; j++ { enc, _ := getEncoder() - require.NotNil(t, enc) + if enc == nil { + failed.Store(true) + return + } var buf bytes.Buffer enc.Reset(&buf) data := []byte("concurrent test data") _, err := enc.Write(data) - require.NoError(t, err) - require.NoError(t, enc.Close()) + if err != nil { + failed.Store(true) + return + } + if err := enc.Close(); err != nil { + failed.Store(true) + return + } putEncoder(enc) } }(i) } wg.Wait() + require.False(t, failed.Load(), "concurrent access failed") })
141-161: Same issue:requirein goroutines for decoder concurrent test.Same concern as the encoder concurrent test - using
requireinside goroutines can cause the test to hang if a failure occurs.
223-224: Add period at end of comment to satisfy godot linter.🔎 Proposed fix
// BenchmarkEncoderPoolAllocs measures allocations with and without pooling. -// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem +// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem. func BenchmarkEncoderPoolAllocs(b *testing.B) {
93-99: Helper function ignores errors silently.The
createCompressedDatahelper ignores all errors. While acceptable for test helpers with known-good inputs, consider adding atparameter to fail fast on unexpected errors.🔎 Proposed fix
- createCompressedData := func(data []byte) []byte { + createCompressedData := func(t *testing.T, data []byte) []byte { + t.Helper() var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf) - _, _ = enc.Write(data) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = enc.Write(data) + require.NoError(t, err) + require.NoError(t, enc.Close()) return buf.Bytes() }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/dotc1z/decoder.gopkg/dotc1z/file.gopkg/dotc1z/pool.gopkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.
Applied to files:
pkg/dotc1z/pool_test.go
🧬 Code graph analysis (3)
pkg/dotc1z/file.go (2)
pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)vendor/google.golang.org/grpc/rpc_util.go (1)
Errorf(973-975)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (3)
NewDecoder(234-267)WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)
pkg/dotc1z/pool.go (3)
pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)vendor/github.com/klauspost/compress/zstd/decoder.go (2)
Decoder(22-47)NewReader(87-123)pkg/dotc1z/decoder.go (2)
WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)
🪛 GitHub Check: go-lint
pkg/dotc1z/pool_test.go
[failure] 224-224:
Comment should end in a period (godot)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: go-test (1.25.2, windows-latest)
- GitHub Check: go-test (1.25.2, ubuntu-latest)
🔇 Additional comments (12)
pkg/dotc1z/decoder.go (2)
145-160: Pool integration logic looks correct.The pool usage condition properly checks that options match pool defaults before attempting reuse. The fallback path on
Resetfailure correctly returns the decoder to the pool and creates a new one.One minor observation: if
getDecoder()returnsnil(whenokis false because pool creation failed), the code correctly falls through to create a new decoder via the existing path.
220-231: Close() correctly distinguishes pooled vs non-pooled decoders.The logic properly returns pooled decoders for reuse and closes non-pooled ones. Setting
d.zd = nilafter handling prevents double-free issues.pkg/dotc1z/file.go (3)
179-182: Returning a closed encoder to the pool is correct behavior.After reviewing the zstd library, calling
Reset(nil)on a closed encoder is safe and documented as the proper way to prepare it for pooling. The encoder can be reused afterReset()is called with a new writer.
148-149: IgnoringResetreturn value is intentional for encoders.Unlike
zstd.Decoder.Reset()which returns an error,zstd.Encoder.Reset()has no return value in the klauspost/compress library, so this is correct.
163-166: Error path correctly closes encoder without returning to pool.On copy error, the encoder is closed to release resources but not returned to the pool since it may be in an inconsistent state. The underscore assignment for
Close()error is acceptable here since we're already returning the original error.pkg/dotc1z/pool_test.go (1)
164-221: Good round-trip tests validating end-to-end correctness.The tests properly exercise both single and multiple round-trips, verifying data integrity. The use of
t.TempDir()ensures cleanup. Based on learnings, the cleanup is automatically handled by the testing framework here.pkg/dotc1z/pool.go (6)
21-36: Encoder pool get/create logic is well-structured.The type assertion with nil check handles both empty pool and type mismatch. Creating with
nilwriter is correct sinceReset()will be called before use.
41-49:putEncodercorrectly resets before pooling.Calling
Reset(nil)releases the reference to the previous writer, preventing memory leaks. The nil check prevents panics.
58-74: Decoder pool creation matchesdecoder.godefaults.The options (
concurrency=1,lowmem=true,maxMemory=defaultDecoderMaxMemory) align with the pool eligibility check indecoder.golines 147.
78-88:putDecodercorrectly handles Reset failure.Unlike
putEncoder, decoders can fail onReset(nil)if in a bad state. Not returning failed decoders to the pool prevents reuse of corrupted instances.
1-88: Overall pool implementation is solid.The design correctly:
- Uses
sync.Poolfor thread-safe pooling- Matches pool eligibility to specific option configurations
- Handles creation failures gracefully
- Cleans up references before returning to pool
- Prevents corrupted instances from being pooled
The ~20,785× reduction in bytes allocated per encode operation (per PR benchmarks) demonstrates significant improvement.
14-16:pooledEncoderConcurrencyis evaluated once at package init.This captures
GOMAXPROCSat program startup. IfGOMAXPROCSis changed at runtime (e.g., viaruntime.GOMAXPROCS(n)), the pool won't match and encoders will be created fresh instead of being pulled from the pool. This is the intended behavior—the code compares the requested encoder concurrency against the pooled default and only reuses pooled encoders when they match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI Agents
In @pkg/dotc1z/pool_test.go:
- Around line 193-220: The test currently calls decoder.Close() and f.Close()
without checking errors; update the cleanup to capture and assert their return
values (e.g., err := decoder.Close(); require.NoError(t, err) and err =
f.Close(); require.NoError(t, err)) after using NewDecoder/decoder and os.Open/f
so cleanup failures fail the test and surface pool/resource issues.
🧹 Nitpick comments (2)
pkg/dotc1z/pool_test.go (2)
93-99: Consider handling errors in the test helper for clarity.The helper silently ignores errors from
enc.Write()andenc.Close(). While failures would likely cause downstream test failures during decompression, explicit error handling (e.g., usingrequire.NoError) would make test failures clearer and fail fast if encoder setup goes wrong.🔎 Proposed fix for explicit error handling
createCompressedData := func(data []byte) []byte { var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf) - _, _ = enc.Write(data) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = enc.Write(data) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) return buf.Bytes() }
244-264: Consider basic error validation in benchmarks for robustness.The "new_encoder_each_time" sub-benchmark ignores all error returns (file operations, writes, Close calls). While this is common in benchmarks to minimize overhead, at minimum checking file operation errors (lines 249-250) with
if err != nil { b.Fatal(err) }would ensure the benchmark is measuring valid operations.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.
Applied to files:
pkg/dotc1z/pool_test.go
🧬 Code graph analysis (1)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (4)
NewDecoder(234-267)C1ZFileHeader(23-23)WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (4)
pkg/dotc1z/pool_test.go (4)
16-89: LGTM! Comprehensive encoder pool testing.The test coverage is thorough:
- Pool behavior correctly verified (cache miss on first call, hit on subsequent calls)
- Compression correctness validated via round-trip decompression
- Concurrent access properly tested with multiple goroutines
101-161: LGTM! Thorough decoder pool testing.The decoder pool tests mirror the encoder tests well:
- Pool behavior validated (first get from empty pool, second from populated pool)
- Decompression correctness verified
- Concurrent access properly exercised
267-299: LGTM! Well-designed allocation benchmark.The benchmark correctly:
- Warms up the pool before measurement
- Resets the timer to exclude setup overhead
- Isolates pure allocation costs by using
bytes.Bufferinstead of file I/O- Provides a clear comparison for the pooling benefit
301-346: LGTM! Decoder benchmark properly configured.The benchmark provides a fair comparison by ensuring the "new_decoder_each_time" path uses the same decoder options (concurrency=1, low memory, max memory) that the pool would use. This ensures accurate measurement of pooling benefits.
| t.Run("multiple round trips reuse pool", func(t *testing.T) { | ||
| tmpDir := t.TempDir() | ||
|
|
||
| for i := 0; i < 10; i++ { | ||
| testData := bytes.Repeat([]byte("iteration data "), 100*(i+1)) | ||
|
|
||
| dbFile := filepath.Join(tmpDir, "test.db") | ||
| err := os.WriteFile(dbFile, testData, 0600) | ||
| require.NoError(t, err) | ||
|
|
||
| c1zFile := filepath.Join(tmpDir, "test.c1z") | ||
| err = saveC1z(dbFile, c1zFile, 0) | ||
| require.NoError(t, err) | ||
|
|
||
| f, err := os.Open(c1zFile) | ||
| require.NoError(t, err) | ||
|
|
||
| decoder, err := NewDecoder(f) | ||
| require.NoError(t, err) | ||
|
|
||
| decoded, err := io.ReadAll(decoder) | ||
| require.NoError(t, err) | ||
| require.Equal(t, testData, decoded) | ||
|
|
||
| decoder.Close() | ||
| f.Close() | ||
| } | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert cleanup errors to verify proper resource cleanup.
Lines 217-218 call decoder.Close() and f.Close() without checking their return values. Per testing best practices, cleanup operations should be asserted to ensure they succeed, especially when testing resource pooling behavior where improper cleanup could mask pool state issues.
Based on learnings, cleanup failures should be treated as test failures.
🔎 Proposed fix to assert cleanup calls
decoded, err := io.ReadAll(decoder)
require.NoError(t, err)
require.Equal(t, testData, decoded)
- decoder.Close()
- f.Close()
+ require.NoError(t, decoder.Close())
+ require.NoError(t, f.Close())
}
})
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| t.Run("multiple round trips reuse pool", func(t *testing.T) { | |
| tmpDir := t.TempDir() | |
| for i := 0; i < 10; i++ { | |
| testData := bytes.Repeat([]byte("iteration data "), 100*(i+1)) | |
| dbFile := filepath.Join(tmpDir, "test.db") | |
| err := os.WriteFile(dbFile, testData, 0600) | |
| require.NoError(t, err) | |
| c1zFile := filepath.Join(tmpDir, "test.c1z") | |
| err = saveC1z(dbFile, c1zFile, 0) | |
| require.NoError(t, err) | |
| f, err := os.Open(c1zFile) | |
| require.NoError(t, err) | |
| decoder, err := NewDecoder(f) | |
| require.NoError(t, err) | |
| decoded, err := io.ReadAll(decoder) | |
| require.NoError(t, err) | |
| require.Equal(t, testData, decoded) | |
| decoder.Close() | |
| f.Close() | |
| } | |
| }) | |
| t.Run("multiple round trips reuse pool", func(t *testing.T) { | |
| tmpDir := t.TempDir() | |
| for i := 0; i < 10; i++ { | |
| testData := bytes.Repeat([]byte("iteration data "), 100*(i+1)) | |
| dbFile := filepath.Join(tmpDir, "test.db") | |
| err := os.WriteFile(dbFile, testData, 0600) | |
| require.NoError(t, err) | |
| c1zFile := filepath.Join(tmpDir, "test.c1z") | |
| err = saveC1z(dbFile, c1zFile, 0) | |
| require.NoError(t, err) | |
| f, err := os.Open(c1zFile) | |
| require.NoError(t, err) | |
| decoder, err := NewDecoder(f) | |
| require.NoError(t, err) | |
| decoded, err := io.ReadAll(decoder) | |
| require.NoError(t, err) | |
| require.Equal(t, testData, decoded) | |
| require.NoError(t, decoder.Close()) | |
| require.NoError(t, f.Close()) | |
| } | |
| }) |
🤖 Prompt for AI Agents
In @pkg/dotc1z/pool_test.go around lines 193 - 220, The test currently calls
decoder.Close() and f.Close() without checking errors; update the cleanup to
capture and assert their return values (e.g., err := decoder.Close();
require.NoError(t, err) and err = f.Close(); require.NoError(t, err)) after
using NewDecoder/decoder and os.Open/f so cleanup failures fail the test and
surface pool/resource issues.
Summary
sync.Poolforzstd.Encoderandzstd.Decoderinstances to reduce allocation overheadzstd.ensureHistallocating 215 MB/min in temporal_sync due to creating new encoders persaveC1zcallChanges
pkg/dotc1z/pool.gopkg/dotc1z/pool_test.gopkg/dotc1z/file.gosaveC1zto use pooled encoderspkg/dotc1z/decoder.goSafety Measures
Reset(nil)called before returning to pool to release writer/reader referencesReset()errors handled gracefully (don't return bad decoders to pool)Benchmark Results
20,785x reduction in bytes allocated per encode operation.
Test plan
dotc1ztests pass🤖 Generated with Claude Code
Summary by CodeRabbit
Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.