diff --git a/cmd/plz4/internal/ops/bakeoff.go b/cmd/plz4/internal/ops/bakeoff.go index fff21cf..dbbfff1 100644 --- a/cmd/plz4/internal/ops/bakeoff.go +++ b/cmd/plz4/internal/ops/bakeoff.go @@ -6,10 +6,9 @@ import ( "fmt" "io" "os" - "runtime" + "slices" "time" - "github.com/gammazero/workerpool" "github.com/jedib0t/go-pretty/v6/progress" "github.com/jedib0t/go-pretty/v6/table" "github.com/pierrec/lz4/v4" @@ -188,13 +187,11 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err return nil, err } - var ( - i = 0 - ) + const nLevels = 10 tr := &progress.Tracker{ Message: "Processing lz4", - Total: srcSz * 10, + Total: srcSz * nLevels, Units: progress.UnitsBytes, } @@ -236,7 +233,7 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err var results []resultT - for ; i < 10; i++ { + for i := range nLevels { start := time.Now() var ( @@ -245,7 +242,8 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err err error ) - lvl, err := lz4Level(i) + // Run backwards so that the cache penalty is less for faster levels. + lvl, err := lz4Level(nLevels - i - 1) // [0,nLevels-1] if err != nil { return nil, err } @@ -260,10 +258,9 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err return nil, err } - // Last one wins; so append is ok. - opts = append(opts, lz4.CompressionLevelOption(lvl)) + nopts := append(opts, lz4.CompressionLevelOption(lvl)) - split, cnt, err = lz4BakeOne(rd, opts...) + split, cnt, err = lz4BakeOne(rd, nopts...) } if err != nil { @@ -282,6 +279,7 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err }) } + slices.Reverse(results) return results, nil } @@ -347,9 +345,11 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er return nil, err } + const nLevels = 12 + tr := &progress.Tracker{ Message: "Processing plz4", - Total: srcSz * 12, + Total: srcSz * nLevels, Units: progress.UnitsBytes, } @@ -364,12 +364,9 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er tr.SetValue(srcOff + (int64(i) * srcSz)) } - wp := workerpool.New(runtime.NumCPU()) - opts = append(opts, plz4.WithProgress(cbHandler), plz4.WithPendingSize(-1), - plz4.WithWorkerPool(wp), ) var srcBlock []byte @@ -385,7 +382,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er var results []resultT - for ; i < 12; i++ { + for ; i < nLevels; i++ { start := time.Now() var ( @@ -394,21 +391,22 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er err error ) + lvl := nLevels - i // [1,nLevels] + if srcBlock != nil { // Block mode - split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(i+1)) + split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(lvl)) } else { - // Last one wins; so append is ok. - if _, err := rd.Seek(0, io.SeekStart); err != nil { return nil, err } - opts = append(opts, - plz4.WithLevel(plz4.LevelT(i+1)), + // Run backwards so that the cache penalty is less for faster levels. + nopts := append(opts, + plz4.WithLevel(plz4.LevelT(lvl)), ) - split, cnt, err = plz4BakeOne(rd, opts...) + split, cnt, err = plz4BakeOne(rd, nopts...) } if err != nil { return nil, err @@ -426,6 +424,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er }) } + slices.Reverse(results) return results, nil } diff --git a/go.mod b/go.mod index 8643935..c75b1bb 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.24.0 require ( github.com/alecthomas/kong v1.13.0 - github.com/gammazero/workerpool v1.1.3 github.com/jedib0t/go-pretty/v6 v6.7.8 github.com/pierrec/lz4/v4 v4.1.25 ) @@ -12,7 +11,6 @@ require ( require ( github.com/clipperhouse/stringish v0.1.1 // indirect github.com/clipperhouse/uax29/v2 v2.3.0 // indirect - github.com/gammazero/deque v1.2.0 // indirect github.com/mattn/go-runewidth v0.0.19 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/term v0.38.0 // indirect diff --git a/go.sum b/go.sum index b080e99..5400b68 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,6 @@ github.com/clipperhouse/uax29/v2 v2.3.0 h1:SNdx9DVUqMoBuBoW3iLOj4FQv3dN5mDtuqwuh github.com/clipperhouse/uax29/v2 v2.3.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gammazero/deque v1.2.0 h1:scEFO8Uidhw6KDU5qg1HA5fYwM0+us2qdeJqm43bitU= -github.com/gammazero/deque v1.2.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg= -github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= -github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jedib0t/go-pretty/v6 v6.7.8 h1:BVYrDy5DPBA3Qn9ICT+PokP9cvCv1KaHv2i+Hc8sr5o= @@ -26,8 +22,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= diff --git a/internal/pkg/async/reader.go b/internal/pkg/async/reader.go index c41c68f..2d3bb06 100644 --- a/internal/pkg/async/reader.go +++ b/internal/pkg/async/reader.go @@ -87,15 +87,15 @@ func NewAsyncReader(rdr io.Reader, hdr header.HeaderT, opts *opts.OptsT) *asyncR // Otherwise could dead lock on too many simultaneous request go r.dispatch() - // Each closure escapes and causes an allocate. - // No reason to do that NParallel times - task := func() { - r.decompress() + nTasks := opts.NParallel + if hdr.Flags.ContentSize() && hdr.ContentSz > 0 { + // If content size known, can limit number of tasks + nTasks = min(opts.NParallel, int(hdr.ContentSz)/bsz+1) } - r.wg.Add(opts.NParallel) - for i := 0; i < opts.NParallel; i++ { - opts.WorkerPool.Submit(task) + r.wg.Add(nTasks) + for range nTasks { + opts.WorkerPool.Submit(r.decompress) } return r @@ -223,8 +223,8 @@ LOOP: func (r *asyncRdrT) NextBlock(prevBlk *blk.BlkT) (*blk.BlkT, int, error) { if prevBlk != nil { - switch { - case r.hasher == nil: + switch r.hasher { + case nil: blk.ReturnBlk(prevBlk) default: r.hasher.Queue(prevBlk) diff --git a/internal/pkg/async/writer.go b/internal/pkg/async/writer.go index c96a5bb..aefc411 100644 --- a/internal/pkg/async/writer.go +++ b/internal/pkg/async/writer.go @@ -10,6 +10,7 @@ import ( "github.com/prequel-dev/plz4/internal/pkg/header" "github.com/prequel-dev/plz4/internal/pkg/opts" "github.com/prequel-dev/plz4/internal/pkg/trailer" + "github.com/prequel-dev/plz4/internal/pkg/xxh32" "github.com/prequel-dev/plz4/internal/pkg/zerr" ) @@ -29,6 +30,7 @@ func (f *stateFlagT) Set(flag stateFlagT) { } type asyncWriterT struct { + wr io.Writer bsz int srcIdx int srcOff int @@ -41,9 +43,7 @@ type asyncWriterT struct { wg sync.WaitGroup opts *opts.OptsT hasher *AsyncHashIdx - taskF func() cmpF compress.CompressorFactory - nTasks int state atomic.Pointer[error] flags stateFlagT } @@ -51,59 +51,30 @@ type asyncWriterT struct { func NewAsyncWriter(wr io.Writer, opts *opts.OptsT) *asyncWriterT { var ( - bsz = opts.BlockSizeIdx.Size() - cmpF = opts.NewCompressorFactory() - nPending = opts.CalcPending() + bsz = opts.BlockSizeIdx.Size() + cmpF = opts.NewCompressorFactory() ) w := &asyncWriterT{ - bsz: bsz, - inChan: make(chan inBlkT), - outChan: make(chan outBlkT), - synChan: make(chan int), - semChan: make(chan struct{}, nPending), - cmpF: cmpF, - opts: opts, - nTasks: 1, - } - - // Spin up writer before compress tasks in - // case worker pool is defined that does not - // have enough slots. Need at least - // 2 slots available (3 if opts.SrcChecksum) - // Note: control routine must be outside of workerpool. - // Otherwise could deadlock on too many simultaneous request - go w.writeLoop(wr) - - if opts.ContentChecksum { - w.hasher = NewAsyncHashIdx(opts.NParallel) - go w.hasher.Run() + wr: wr, + bsz: bsz, + cmpF: cmpF, + opts: opts, } - // Bind task function - // Each closure escapes and causes an allocate. - // No reason to do that NParallel times - w.taskF = func() { - w.compressLoop() - } + // Defer spinning up async goroutines tasks until first buffer is flushed. + // This allows us to avoid unnecessary resource consumption + // in the case of very small payloads that fit within + // a single block. - // Spin up at least one producer task; we will need at least one - // assuming there is a write at some point. - // Will spin up additional on demand; this conserves - // resources in auto parallel mode; particularly with small payloads. - // If we do happen to have content size, intialize based on size: + // However, if user set content size, we know how large the source + // data is and can launch the appropriate number of tasks up front. if opts.ContentSz != nil { - w.nTasks = int(*opts.ContentSz)/bsz + 1 - if w.nTasks > opts.NParallel { - w.nTasks = opts.NParallel + if nBuffers := int(*opts.ContentSz)/bsz + 1; nBuffers > 1 { + w.kickoffAsync() } } - w.wg.Add(w.nTasks) - for i := 0; i < w.nTasks; i++ { - opts.WorkerPool.Submit(w.taskF) - } - return w } @@ -125,7 +96,7 @@ func (w *asyncWriterT) Write(src []byte) (int, error) { // Flush block if completely filled if w.srcOff == w.bsz { - w._flushBlk() + w._flushBlk(false) } // Slide the src buffer over by N for next spin @@ -143,7 +114,7 @@ func (w *asyncWriterT) Flush() error { } // Flush out pending data if any - w.flushBlk() + w.flushBlk(false) // If no data has been queue, return. if w.srcIdx == 0 { @@ -167,26 +138,28 @@ func (w *asyncWriterT) Close() error { } // Flush any outstanding data - w.flushBlk() + w.flushBlk(true) - // Close down the semaphore. - // No long necessary after last flush. - close(w.semChan) + if w.semChan != nil { + // Close down the semaphore. + // No long necessary after last flush. + close(w.semChan) - // Close down the inChan. - // This will cause the producer goroutines to exit. - close(w.inChan) + // Close down the inChan. + // This will cause the producer goroutines to exit. + close(w.inChan) - // Wait for the producer go routines to cycle down; - // Not safe to close the w.outChan until all have exited. - w.wg.Wait() + // Wait for the producer go routines to cycle down; + // Not safe to close the w.outChan until all have exited. + w.wg.Wait() - // Close down the outChan. This is safe because - // all the producers have closed down via wg.Wait() - close(w.outChan) + // Close down the outChan. This is safe because + // all the producers have closed down via wg.Wait() + close(w.outChan) - // Wait for the writeLoop goroutine to exit - <-w.synChan + // Wait for the writeLoop goroutine to exit + <-w.synChan + } // Return the srcBlk to the pool blk.ReturnBlk(w.srcBlk) @@ -236,7 +209,7 @@ LOOP: switch rerr { case nil: // srcBlk was filled; flush the block to the out channel - w._flushBlk() + w._flushBlk(false) case io.ErrUnexpectedEOF: // Some bytes were read and add to w.srcBlk. // Defer flush and spin loop again. @@ -308,9 +281,10 @@ LOOP: } } -func (w *asyncWriterT) writeLoop(wr io.Writer) { +func (w *asyncWriterT) writeLoop() { var ( + wr = w.wr nextIdx = 0 flushIdx = -1 srcMark = int64(0) @@ -425,14 +399,14 @@ func (w *asyncWriterT) writeTrailer(wr io.Writer) (int, error) { return trailer.WriteTrailerWithHash(wr, xxh) } -func (w *asyncWriterT) flushBlk() { +func (w *asyncWriterT) flushBlk(close bool) { if w.srcOff == 0 { return } // Clip w.srcBlk to whatever we are currently cached to w.srcBlk.Trim(w.srcOff) - w._flushBlk() + w._flushBlk(close) } func (w *asyncWriterT) _genDict() (outDict *blk.BlkT) { @@ -462,12 +436,54 @@ func (w *asyncWriterT) _genDict() (outDict *blk.BlkT) { return } -func (w *asyncWriterT) _flushBlk() { +func (w *asyncWriterT) kickoffAsync() { + + var ( + nPending = w.opts.CalcPending() + ) + + w.inChan = make(chan inBlkT) + w.outChan = make(chan outBlkT) + w.synChan = make(chan int) + w.semChan = make(chan struct{}, nPending) + + go w.writeLoop() + + if w.opts.ContentChecksum { + w.hasher = NewAsyncHashIdx(w.opts.NParallel) + go w.hasher.Run() + } + + // Fire the compressor tasks; limit if we know content size. + nTasks := w.opts.NParallel + if w.opts.ContentSz != nil { + nTasks = min(w.opts.NParallel, int(*w.opts.ContentSz)/w.bsz+1) + } + + w.wg.Add(nTasks) + for range nTasks { + w.opts.WorkerPool.Submit(w.compressLoop) + } +} + +func (w *asyncWriterT) _flushBlk(close bool) { if w.srcBlk == nil || w.srcOff == 0 { // Nothing to flush; return return } + switch { + case w.semChan != nil: + // Async already kicked; proceed + case close: + // Go close before flushing first block. + // Run synchronous write to avoid spinning up goroutines unnecessarily. + w.writeSync() + return + default: + w.kickoffAsync() + } + // Defer content hash to minimize pipeline blockage if w.hasher != nil { w.hasher.Queue(w.srcBlk) @@ -483,19 +499,56 @@ func (w *asyncWriterT) _flushBlk() { dict: w._genDict(), } - // Try to get ahead of next write by - // spawning go routine if we have capacity - if w.nTasks < w.opts.NParallel { - w.nTasks += 1 - w.wg.Add(1) - w.opts.WorkerPool.Submit(w.taskF) - } - w.srcBlk = nil w.srcOff = 0 w.srcIdx += 1 } +func (w *asyncWriterT) writeSync() { + if err := w._writeSync(); err != nil { + w.setError(err) + } +} + +func (w *asyncWriterT) _writeSync() error { + + hdrSz, err := header.WriteHeader(w.wr, w.opts) + if err != nil { + return err + } + + var ( + cmp = w.cmpF.NewCompressor() + ) + + // Compress and write out the final block. + // The dictionary, if included, is applied by the compressor. + dstBlk, err := blk.CompressToBlk(w.srcBlk.Data(), cmp, w.bsz, w.opts.BlockChecksum, nil) + if err != nil { + return err + } + defer blk.ReturnBlk(dstBlk) + + _, err = w.wr.Write(dstBlk.Data()) + + if err != nil { + return err + } + + w.opts.Handler(0, int64(hdrSz)) + + switch w.opts.ContentChecksum { + case true: + var hasher xxh32.XXHZero + hasher.Write(w.srcBlk.Data()) + _, err = trailer.WriteTrailerWithHash(w.wr, hasher.Sum32()) + default: + _, err = trailer.WriteTrailer(w.wr) + } + + return err +} + // First error wins. Nil error will panic. func (w *asyncWriterT) setError(err error) bool { return w.state.CompareAndSwap(nil, &err) diff --git a/internal/pkg/clz4/clz4.go b/internal/pkg/clz4/clz4.go index 18b6f10..4f214d3 100644 --- a/internal/pkg/clz4/clz4.go +++ b/internal/pkg/clz4/clz4.go @@ -2,7 +2,7 @@ package clz4 -// #cgo CFLAGS: -O3 +// #cgo CFLAGS: -O3 -flto // #include "lz4.h" // #include "lz4hc.h" import "C" diff --git a/internal/pkg/compress/compress.go b/internal/pkg/compress/compress.go index aba8f92..b6c5750 100644 --- a/internal/pkg/compress/compress.go +++ b/internal/pkg/compress/compress.go @@ -5,6 +5,10 @@ package compress import "github.com/prequel-dev/plz4/internal/pkg/clz4" type Compressor interface { + // 'src' is the source data to compress + // 'dst' is the destination buffer to write compressed data into + // 'dict' is an optional dictionary (may be nil) + // Returns number of bytes written to 'dst' and error if any Compress(src, dst, dict []byte) (int, error) } diff --git a/internal/pkg/wpool/wpool.go b/internal/pkg/wpool/wpool.go new file mode 100644 index 0000000..ee165e9 --- /dev/null +++ b/internal/pkg/wpool/wpool.go @@ -0,0 +1,294 @@ +package wpool + +// Lightweight worker pool implementation. +// Supports dispatch of tasks to workers in order of Submit. +// Supports dynamic scaling of workers between min and max limits, +// with idle workers being drained after a configurable timeout. +// +// Note: In modern golang versions, the built-in goroutine scheduler +// is quite efficient at managing large numbers of goroutines, so +// this worker pool is primarily useful for limiting concurrency +// rather than for performance optimization. + +import ( + "runtime" + "slices" + "sync" + "time" +) + +const ( + drainTick = time.Second * 10 + drainMax = time.Second * 30 +) + +type Opts func(optsT) optsT + +type optsT struct { + maxWorkers int + minWorkers int + preWorkers int + tskPrealloc int + drainTick time.Duration + drainMax time.Duration +} + +func WithMaxWorkers(n int) Opts { + return func(o optsT) optsT { + o.maxWorkers = n + return o + } +} + +func WithMinWorkers(n int) Opts { + return func(o optsT) optsT { + o.minWorkers = n + return o + } +} + +func WithPreallocWorkers(n int) Opts { + return func(o optsT) optsT { + o.preWorkers = n + return o + } +} + +func WithTaskPrealloc(n int) Opts { + return func(o optsT) optsT { + o.tskPrealloc = n + return o + } +} + +func WithDrainTick(d time.Duration) Opts { + return func(o optsT) optsT { + o.drainTick = d + return o + } +} + +func WithDrainMax(d time.Duration) Opts { + return func(o optsT) optsT { + o.drainMax = d + return o + } +} + +func parseOpts(optList ...Opts) optsT { + o := optsT{ + maxWorkers: runtime.NumCPU(), + drainTick: drainTick, + drainMax: drainMax, + } + + for _, opt := range optList { + o = opt(o) + } + + return o +} + +type WorkerPool struct { + mux sync.Mutex + qTasks []func() + qIdle []*workerT + taskIdx int + numWorkers int + maxWorkers int + minWorkers int +} + +type workerT struct { + stamp int64 + ch chan func() +} + +func NewWorkerPool(opts ...Opts) *WorkerPool { + o := parseOpts(opts...) + + nTask := o.tskPrealloc + if nTask <= 0 { + nTask = o.maxWorkers + } + + wp := &WorkerPool{ + qTasks: make([]func(), 0, nTask), + qIdle: make([]*workerT, 0, o.maxWorkers), + maxWorkers: o.maxWorkers, + minWorkers: o.minWorkers, + } + + if o.minWorkers < 0 { + o.minWorkers = 0 + } + + nAlloc := min(o.maxWorkers, max(o.preWorkers, o.minWorkers)) + for i := 0; i < nAlloc; i++ { + w := newWorker() + wp.numWorkers++ + wp.qIdle = append(wp.qIdle, w) + go w.run(wp) + } + + if wp.maxWorkers > 0 { + go wp.drainIdleWorkers(o.drainTick, o.drainMax) + } + + return wp +} + +func (wp *WorkerPool) drainIdleWorkers(drainTick, drainMax time.Duration) { + ticker := time.NewTicker(drainTick) + defer ticker.Stop() + + for range ticker.C { + if !wp.runGC(drainMax) { + return + } + } +} + +func (wp *WorkerPool) runGC(drainMax time.Duration) bool { + wp.mux.Lock() + defer wp.mux.Unlock() + + if wp.maxWorkers <= 0 { + return false + } + + now := time.Now().UnixNano() + + i := 0 + + for ; i < len(wp.qIdle); i++ { + + // Break out of work if we reach minWorkers + if len(wp.qIdle)-i <= wp.minWorkers { + break + } + + w := wp.qIdle[i] + + age := now - w.stamp + + if age < int64(drainMax) { + break + } + + // Force the worker closed + close(w.ch) + } + + // Fix up the idleQ + if i > 0 { + wp.qIdle = wp.qIdle[i:] + wp.numWorkers -= i + } + + // Slide tasks down if necessary; + // TODO consider trim down allocation if too large + if wp.taskIdx > 0 { + wp.qTasks = slices.Delete(wp.qTasks, 0, wp.taskIdx) + wp.taskIdx = 0 + } + + return true +} + +func (wp *WorkerPool) Close() { + wp.mux.Lock() + defer wp.mux.Unlock() + + wp.maxWorkers = 0 + + for _, w := range wp.qIdle { + close(w.ch) + } + wp.qIdle = nil + wp.qTasks = nil + wp.taskIdx = 0 +} + +func (wp *WorkerPool) Submit(task func()) { + wp.mux.Lock() + + // If there is an idle worker, give it the task + if nq := len(wp.qIdle); nq > 0 { + w := wp.qIdle[nq-1] + wp.qIdle = wp.qIdle[:nq-1] + wp.mux.Unlock() + + w.submit(task) + return + } + + if wp.numWorkers < wp.maxWorkers { + // Create a new worker + wp.numWorkers++ + wp.mux.Unlock() + + w := newWorker() + go w.run(wp) + w.submit(task) + return + } + + if wp.maxWorkers > 0 { + // We are at max workers; queue the task + wp.qTasks = append(wp.qTasks, task) + } + + wp.mux.Unlock() +} + +func (wp *WorkerPool) maybeQueueIdle(w *workerT) (func(), bool) { + wp.mux.Lock() + defer wp.mux.Unlock() + + if wp.maxWorkers <= 0 { + wp.numWorkers-- + return nil, false + } + + nt := len(wp.qTasks) + + if nt-wp.taskIdx == 0 { + // No tasks; queue as idle + w.stamp = time.Now().UnixNano() + wp.qIdle = append(wp.qIdle, w) + return nil, true + } + + // Grab new tasks from front of task queue + task := wp.qTasks[wp.taskIdx] + wp.taskIdx++ + return task, true +} + +func newWorker() *workerT { + return &workerT{ + ch: make(chan func(), 1), + } +} + +func (w *workerT) submit(task func()) { + w.ch <- task +} + +func (w *workerT) run(wp *WorkerPool) { + for task := range w.ch { + task() + + for { + task, ok := wp.maybeQueueIdle(w) + if !ok { + return + } + if task == nil { + break + } + task() + } + } +} diff --git a/internal/pkg/wpool/wpool_test.go b/internal/pkg/wpool/wpool_test.go new file mode 100644 index 0000000..74ffdb4 --- /dev/null +++ b/internal/pkg/wpool/wpool_test.go @@ -0,0 +1,749 @@ +package wpool + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +// Test that Submit executes all queued tasks up to maxWorkers +// and that no tasks are dropped while the pool is open. +func TestWorkerPoolSubmitExecutesTasks(t *testing.T) { + const ( + nTasks = 1024 + ) + + wp := NewWorkerPool() + defer wp.Close() + + var ( + wg sync.WaitGroup + counter int32 + ) + + wg.Add(nTasks) + for range nTasks { + wp.Submit(func() { + atomic.AddInt32(&counter, 1) + wg.Done() + }) + } + + wg.Wait() + + if got := atomic.LoadInt32(&counter); got != nTasks { + t.Fatalf("expected %d tasks to run, got %d", nTasks, got) + } +} + +// Test that when maxWorkers is reached, additional submitted tasks +// are queued and eventually executed by existing workers. +func TestWorkerPoolQueuedTasksExecuted(t *testing.T) { + const nTasks = 5 + + wp := NewWorkerPool(WithMaxWorkers(1)) + defer wp.Close() + + var ( + wg sync.WaitGroup + counter int32 + ) + + wg.Add(nTasks) + for i := 0; i < nTasks; i++ { + wp.Submit(func() { + defer wg.Done() + // Give GC/idling a chance, but keep it short to avoid flakiness. + time.Sleep(1 * time.Millisecond) + atomic.AddInt32(&counter, 1) + }) + } + + wg.Wait() + + if got := atomic.LoadInt32(&counter); got != nTasks { + t.Fatalf("expected %d queued tasks to run, got %d", nTasks, got) + } +} + +// Test that Close stops accepting new tasks and closes idle workers. +func TestWorkerPoolCloseStopsNewTasks(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(1)) + + var ranFirst, ranAfterClose int32 + var wg sync.WaitGroup + + wg.Add(1) + wp.Submit(func() { + atomic.StoreInt32(&ranFirst, 1) + wg.Done() + }) + + wg.Wait() + + // Close the pool; subsequent Submit calls should be ignored. + wp.Close() + + wp.Submit(func() { + // This should never run if Close worked correctly. + atomic.StoreInt32(&ranAfterClose, 1) + }) + + // Give a brief window for any incorrectly accepted task to run. + time.Sleep(50 * time.Millisecond) + + if atomic.LoadInt32(&ranFirst) != 1 { + t.Fatalf("expected first task to run before Close") + } + + if got := atomic.LoadInt32(&ranAfterClose); got != 0 { + t.Fatalf("expected no tasks to run after Close, got %d", got) + } +} + +// Test runGC behavior by constructing idle workers with timestamps +// far in the past and ensuring they are closed and removed. +func TestWorkerPoolRunGCClosesOldWorkers(t *testing.T) { + wp := &WorkerPool{ + qTasks: nil, + qIdle: nil, + numWorkers: 0, + maxWorkers: 2, + } + + // Create two idle workers with old timestamps. + w1 := newWorker() + w2 := newWorker() + + old := time.Now().Add(-drainMax * 2).UnixNano() + w1.stamp = old + w2.stamp = old + + wp.qIdle = []*workerT{w1, w2} + wp.numWorkers = 2 + + if cont := wp.runGC(drainMax); !cont { + t.Fatalf("runGC unexpectedly requested stop") + } + + // Both workers should have been removed from qIdle and numWorkers decremented. + if len(wp.qIdle) != 0 { + t.Fatalf("expected qIdle to be empty after GC, got %d", len(wp.qIdle)) + } + if wp.numWorkers != 0 { + t.Fatalf("expected numWorkers to be 0 after GC, got %d", wp.numWorkers) + } +} + +// Test runGC returns false when pool is effectively closed (maxWorkers <= 0). +func TestWorkerPoolRunGCStopsWhenClosed(t *testing.T) { + wp := &WorkerPool{maxWorkers: 0} + if cont := wp.runGC(drainMax); cont { + t.Fatalf("expected runGC to return false when maxWorkers <= 0") + } +} + +// Test all option setters to reach 100% coverage +func TestAllOptionSetters(t *testing.T) { + wp := NewWorkerPool( + WithMaxWorkers(10), + WithMinWorkers(2), + WithPreallocWorkers(3), + WithTaskPrealloc(100), + WithDrainTick(time.Millisecond*100), + WithDrainMax(time.Millisecond*200), + ) + defer wp.Close() + + if wp.maxWorkers != 10 { + t.Fatalf("expected maxWorkers 10, got %d", wp.maxWorkers) + } + if wp.minWorkers != 2 { + t.Fatalf("expected minWorkers 2, got %d", wp.minWorkers) + } + + // Test that preallocated workers exist + if wp.numWorkers < 2 { + t.Fatalf("expected at least 2 preallocated workers, got %d", wp.numWorkers) + } +} + +// Test negative minWorkers is clamped to 0 +func TestNegativeMinWorkers(t *testing.T) { + wp := NewWorkerPool(WithMinWorkers(-5)) + defer wp.Close() + + // Test that pool still works normally + var wg sync.WaitGroup + wg.Add(1) + wp.Submit(func() { + wg.Done() + }) + wg.Wait() +} + +// Test drainIdleWorkers ticker loop terminates when pool is closed +func TestDrainIdleWorkersStopsOnClose(t *testing.T) { + wp := NewWorkerPool( + WithMaxWorkers(5), + WithMinWorkers(0), + WithDrainTick(time.Millisecond*10), + WithDrainMax(time.Millisecond*20), + ) + + // Give workers time to be created + time.Sleep(time.Millisecond * 5) + + // Close should stop the drainIdleWorkers goroutine + wp.Close() + + // Give time for goroutine to exit + time.Sleep(time.Millisecond * 50) + + // If we didn't hang, the test passed +} + +// Test runGC respects minWorkers +func TestRunGCRespectsMinWorkers(t *testing.T) { + wp := &WorkerPool{ + qTasks: nil, + qIdle: nil, + numWorkers: 0, + maxWorkers: 5, + minWorkers: 2, + } + + // Create 3 idle workers, all with old timestamps + old := time.Now().Add(-drainMax * 2).UnixNano() + for i := 0; i < 3; i++ { + w := newWorker() + w.stamp = old + wp.qIdle = append(wp.qIdle, w) + wp.numWorkers++ + } + + if cont := wp.runGC(drainMax); !cont { + t.Fatalf("runGC unexpectedly requested stop") + } + + // Should keep minWorkers (2) and only remove 1 + if len(wp.qIdle) != 2 { + t.Fatalf("expected 2 idle workers after GC (minWorkers), got %d", len(wp.qIdle)) + } + if wp.numWorkers != 2 { + t.Fatalf("expected numWorkers to be 2 after GC, got %d", wp.numWorkers) + } +} + +// Test runGC doesn't drain young workers +func TestRunGCKeepsYoungWorkers(t *testing.T) { + wp := &WorkerPool{ + qTasks: nil, + qIdle: nil, + numWorkers: 0, + maxWorkers: 5, + minWorkers: 0, + } + + // Create workers with recent timestamps + now := time.Now().UnixNano() + for i := 0; i < 3; i++ { + w := newWorker() + w.stamp = now + wp.qIdle = append(wp.qIdle, w) + wp.numWorkers++ + } + + if cont := wp.runGC(drainMax); !cont { + t.Fatalf("runGC unexpectedly requested stop") + } + + // All workers should remain since they're not old enough + if len(wp.qIdle) != 3 { + t.Fatalf("expected 3 idle workers after GC (all young), got %d", len(wp.qIdle)) + } + if wp.numWorkers != 3 { + t.Fatalf("expected numWorkers to be 3 after GC, got %d", wp.numWorkers) + } +} + +// Test runGC cleans up task queue sliding +func TestRunGCCleansUpTaskQueue(t *testing.T) { + wp := &WorkerPool{ + qTasks: make([]func(), 10), + qIdle: nil, + taskIdx: 8, + numWorkers: 0, + maxWorkers: 5, + minWorkers: 0, + } + + // Add 2 actual tasks at the end + counter := 0 + wp.qTasks[8] = func() { counter++ } + wp.qTasks[9] = func() { counter++ } + + if cont := wp.runGC(drainMax); !cont { + t.Fatalf("runGC unexpectedly requested stop") + } + + // taskIdx should be reset to 0 + if wp.taskIdx != 0 { + t.Fatalf("expected taskIdx to be 0 after GC, got %d", wp.taskIdx) + } + + // qTasks should have slid down + if len(wp.qTasks) != 2 { + t.Fatalf("expected qTasks length to be 2 after GC, got %d", len(wp.qTasks)) + } + + // Verify tasks are still executable + for _, task := range wp.qTasks { + if task != nil { + task() + } + } + if counter != 2 { + t.Fatalf("expected 2 tasks to execute, got %d", counter) + } +} + +// Test multiple concurrent Close calls don't panic +func TestConcurrentClose(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(10)) + + var wg sync.WaitGroup + wg.Add(3) + + for i := 0; i < 3; i++ { + go func() { + defer wg.Done() + wp.Close() + }() + } + + wg.Wait() +} + +// Test Submit after Close is safe +func TestSubmitAfterClose(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(2)) + wp.Close() + + // Should not panic or hang + executed := false + wp.Submit(func() { + executed = true + }) + + time.Sleep(time.Millisecond * 50) + + if executed { + t.Fatalf("expected task not to execute after Close") + } +} + +// Test worker can process multiple tasks from queue +func TestWorkerProcessesMultipleQueuedTasks(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(1)) + defer wp.Close() + + var wg sync.WaitGroup + counter := int32(0) + + // Submit more tasks than workers + n := 10 + wg.Add(n) + for i := 0; i < n; i++ { + wp.Submit(func() { + atomic.AddInt32(&counter, 1) + time.Sleep(time.Millisecond) + wg.Done() + }) + } + + wg.Wait() + + if got := atomic.LoadInt32(&counter); got != int32(n) { + t.Fatalf("expected %d tasks to run, got %d", n, got) + } +} + +// Test zero maxWorkers means pool doesn't accept tasks and doesn't +// start any workers or drain goroutines. +func TestZeroMaxWorkers(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(0)) + defer wp.Close() + + var executed int32 + for i := 0; i < 10; i++ { + wp.Submit(func() { + atomic.AddInt32(&executed, 1) + }) + } + + // Give any (incorrect) execution a brief chance to run. + time.Sleep(10 * time.Millisecond) + + if got := atomic.LoadInt32(&executed); got != 0 { + t.Fatalf("expected no task execution with maxWorkers=0, got %d", got) + } + + // Inspect internal state to ensure no workers or queued tasks. + wp.mux.Lock() + defer wp.mux.Unlock() + if wp.numWorkers != 0 { + t.Fatalf("expected numWorkers=0, got %d", wp.numWorkers) + } + if len(wp.qIdle) != 0 { + t.Fatalf("expected no idle workers, got %d", len(wp.qIdle)) + } + if len(wp.qTasks) != 0 { + t.Fatalf("expected no queued tasks, got %d", len(wp.qTasks)) + } +} + +// Test maybeQueueIdle when maxWorkers is 0 +func TestMaybeQueueIdleWhenClosed(t *testing.T) { + wp := &WorkerPool{ + maxWorkers: 0, + } + + w := newWorker() + task, ok := wp.maybeQueueIdle(w) + + if ok { + t.Fatalf("expected maybeQueueIdle to return false when maxWorkers=0") + } + if task != nil { + t.Fatalf("expected nil task when maxWorkers=0") + } +} + +// Test race between Submit and Close under heavier but bounded contention. +// The goal is to exercise potential deadlocks without making the test flaky +// or unbounded in runtime. +func TestSubmitCloseRace(t *testing.T) { + const ( + iterations = 20 + nSubmitGoroutes = 8 + submitsPerG = 100 + ) + + for i := 0; i < iterations; i++ { + wp := NewWorkerPool(WithMaxWorkers(4)) + + var wg sync.WaitGroup + // submitters + closers + wg.Add(nSubmitGoroutes + 2) + + // Multiple submitter goroutines hammer Submit. + for s := 0; s < nSubmitGoroutes; s++ { + go func() { + defer wg.Done() + for j := 0; j < submitsPerG; j++ { + wp.Submit(func() { + // Small work to keep workers busy but bounded. + time.Sleep(time.Microsecond) + }) + } + }() + } + + // Two closers race with submitters; Close is idempotent. + go func() { + defer wg.Done() + time.Sleep(50 * time.Microsecond) + wp.Close() + }() + go func() { + defer wg.Done() + time.Sleep(150 * time.Microsecond) + wp.Close() + }() + + // If there is a deadlock between Submit/Close/drainGC, this wait + // will eventually hit the test-wide timeout. + wg.Wait() + } +} + +// Test preallocation options +func TestPreallocationOptions(t *testing.T) { + tests := []struct { + name string + maxWorkers int + preWorkers int + minWorkers int + taskPrealloc int + }{ + {"preallocate more than max", 5, 10, 0, 0}, + {"preallocate equal to max", 5, 5, 0, 0}, + {"preallocate with minWorkers", 10, 3, 5, 0}, + {"large task prealloc", 5, 0, 0, 1000}, + {"zero task prealloc", 5, 0, 0, 0}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := []Opts{ + WithMaxWorkers(tt.maxWorkers), + WithPreallocWorkers(tt.preWorkers), + WithMinWorkers(tt.minWorkers), + } + if tt.taskPrealloc > 0 { + opts = append(opts, WithTaskPrealloc(tt.taskPrealloc)) + } + + wp := NewWorkerPool(opts...) + defer wp.Close() + + // Verify pool works + var wg sync.WaitGroup + wg.Add(1) + wp.Submit(func() { + wg.Done() + }) + wg.Wait() + }) + } +} + +// Test stress with many concurrent submits +func TestConcurrentSubmits(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(4)) + defer wp.Close() + + var wg sync.WaitGroup + counter := int32(0) + n := 1000 + + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + wp.Submit(func() { + atomic.AddInt32(&counter, 1) + wg.Done() + }) + }() + } + + wg.Wait() + + if got := atomic.LoadInt32(&counter); got != int32(n) { + t.Fatalf("expected %d tasks to run, got %d", n, got) + } +} + +// Test that idle workers actually go idle and get reused +func TestWorkerIdleAndReuse(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(5), WithMinWorkers(0)) + defer wp.Close() + + var wg sync.WaitGroup + + // First wave of tasks + wg.Add(5) + for i := 0; i < 5; i++ { + wp.Submit(func() { + time.Sleep(time.Millisecond) + wg.Done() + }) + } + wg.Wait() + + // Give workers time to become idle + time.Sleep(time.Millisecond * 10) + + // Check that some workers are idle + wp.mux.Lock() + idleCount := len(wp.qIdle) + wp.mux.Unlock() + + if idleCount == 0 { + t.Fatalf("expected some idle workers, got 0") + } + + // Second wave should reuse idle workers + wg.Add(3) + for i := 0; i < 3; i++ { + wp.Submit(func() { + time.Sleep(time.Millisecond) + wg.Done() + }) + } + wg.Wait() +} + +// Test panic recovery in tasks doesn't crash the worker +func TestPanicInTask(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(2)) + defer wp.Close() + + var wg sync.WaitGroup + + // First task panics (if not handled, worker goroutine crashes) + wg.Add(1) + wp.Submit(func() { + defer wg.Done() + // Note: This will panic and likely crash the worker since + // there's no recovery in the implementation + // This test documents the behavior + }) + + // Second task should still work if worker pool is robust + wg.Add(1) + wp.Submit(func() { + time.Sleep(time.Millisecond) + wg.Done() + }) + + wg.Wait() +} + +// Test that a worker processing tasks exits correctly when a task is +// delivered after Close has been called on the pool. This exercises the +// `!ok` path in workerT.run where maybeQueueIdle returns false once +// maxWorkers <= 0. +func TestWorkerTaskAfterCloseTriggersExit(t *testing.T) { + wp := &WorkerPool{maxWorkers: 1} + + var wg sync.WaitGroup + wg.Add(1) + + ch := make(chan struct{}) + wp.Submit(func() { + wg.Wait() + ch <- struct{}{} + }) + + // Close the pool while the worker is busy + wp.Close() + + // Unblock the worker + wg.Done() + + <-ch + + // At this point the worker should have ben to exit, + // Give time for worker to exit. + time.Sleep(10 * time.Millisecond) + + // Inspect internal state to ensure no workers remain + wp.mux.Lock() + defer wp.mux.Unlock() + if wp.numWorkers != 0 { + t.Fatalf("expected numWorkers=0 after Close and task completion, got %d", wp.numWorkers) + } + if len(wp.qIdle) != 0 { + t.Fatalf("expected no idle workers after Close, got %d", len(wp.qIdle)) + } +} + +// Test empty pool behavior when created with maxWorkers=0 and then closed. +// Submit should be a no-op and must not execute the task. +func TestEmptyPoolSubmit(t *testing.T) { + wp := NewWorkerPool(WithMaxWorkers(0)) + wp.Close() + + executed := false + wp.Submit(func() { + executed = true + }) + + time.Sleep(10 * time.Millisecond) + + if executed { + t.Fatalf("expected no task execution when submitting to closed zero-worker pool") + } +} + +// Test mixed old and young workers in GC +func TestRunGCMixedAgeWorkers(t *testing.T) { + wp := &WorkerPool{ + qIdle: nil, + numWorkers: 0, + maxWorkers: 10, + minWorkers: 0, + } + + now := time.Now().UnixNano() + old := time.Now().Add(-drainMax * 2).UnixNano() + + // Add old workers first (they should be drained) + for i := 0; i < 3; i++ { + w := newWorker() + w.stamp = old + wp.qIdle = append(wp.qIdle, w) + wp.numWorkers++ + } + + // Add young workers (they should stay) + for i := 0; i < 3; i++ { + w := newWorker() + w.stamp = now + wp.qIdle = append(wp.qIdle, w) + wp.numWorkers++ + } + + wp.runGC(drainMax) + + // Should keep only the young workers + if len(wp.qIdle) != 3 { + t.Fatalf("expected 3 workers after GC (young ones), got %d", len(wp.qIdle)) + } + + // Verify remaining workers are the young ones + for _, w := range wp.qIdle { + age := now - w.stamp + if age > int64(drainMax) { + t.Fatalf("found old worker after GC with age %d", age) + } + } +} + +const ( + PoolSize = int(1e4) + TaskNum = int(1e6) +) + +func BenchmarkWorkerPool(b *testing.B) { + pool := NewWorkerPool(WithMaxWorkers(PoolSize), WithTaskPrealloc(TaskNum/2)) + defer pool.Close() + + var wg sync.WaitGroup + + taskFunc := func() { + time.Sleep(time.Millisecond) + wg.Done() + } + + b.ResetTimer() + for range b.N { + wg.Add(TaskNum) + for range TaskNum { + pool.Submit(taskFunc) + } + wg.Wait() + } + b.StopTimer() +} + +func BenchmarkGoroutines(b *testing.B) { + var wg sync.WaitGroup + + taskFunc := func() { + time.Sleep(time.Millisecond) + wg.Done() + } + + b.ResetTimer() + for range b.N { + wg.Add(TaskNum) + for range TaskNum { + go taskFunc() + } + wg.Wait() + } +} diff --git a/internal/test/block_test.go b/internal/test/block_test.go index b8324f5..592cfeb 100644 --- a/internal/test/block_test.go +++ b/internal/test/block_test.go @@ -8,6 +8,9 @@ import ( "github.com/pierrec/lz4/v4" "github.com/prequel-dev/plz4" + "github.com/prequel-dev/plz4/internal/pkg/blk" + "github.com/prequel-dev/plz4/internal/pkg/descriptor" + "github.com/prequel-dev/plz4/internal/pkg/opts" ) // Basic round-trip tests for CompressBlock/DecompressBlock. @@ -387,13 +390,115 @@ func BenchmarkCompressBlockWithLevel(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - dst, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(lvl), plz4.WithBlockDst(dst)) + _, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(lvl), plz4.WithBlockDst(dst)) if err != nil { b.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) } - b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") } + + // b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") + }) + } +} + +// BenchmarkCompressBlockWithLevel focuses on how compression level +// affects performance for a fixed moderately sized input. +func BenchmarkCompressBlockWithLevel2(b *testing.B) { + src, _ := LoadSample(b, LargeUncompressed) + + var ( + dst = make([]byte, plz4.CompressBlockBound(len(src))) + levels = []plz4.LevelT{plz4.Level1, plz4.Level3, plz4.Level6, plz4.Level9} + ) + + for _, lvl := range levels { + lvl := lvl + b.Run(fmt.Sprintf("level_%d", lvl), func(b *testing.B) { + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(lvl), plz4.WithBlockDst(dst)) + if err != nil { + b.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) + } + + } + + // b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") + }) + } +} + +func BenchmarkCompressBlockWithLevelLz4(b *testing.B) { + src, _ := LoadSample(b, LargeUncompressed) + + var ( + dst = make([]byte, lz4.CompressBlockBound(len(src))) + levels = []lz4.CompressionLevel{lz4.Fast, lz4.Level3, lz4.Level6, lz4.Level9} + ) + + for _, lvl := range levels { + lvl := lvl + b.Run(fmt.Sprintf("level_%d", lvl), func(b *testing.B) { + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var err error + if lvl == lz4.Fast { + _, err = lz4.CompressBlock(src, dst, nil) + } else { + _, err = lz4.CompressBlockHC(src, dst, lvl, nil, nil) + } + + if err != nil { + b.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) + } + + } + + // b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") + }) + } +} + +func BenchmarkCompressBlockWithLevelock(b *testing.B) { + src, _ := LoadSample(b, LargeUncompressed) + + var ( + levels = []plz4.LevelT{plz4.Level1, plz4.Level3, plz4.Level6, plz4.Level9} + ) + + for _, lvl := range levels { + lvl := lvl + b.Run(fmt.Sprintf("level_%d", lvl), func(b *testing.B) { + + opts := opts.OptsT{ + Level: lvl, + BlockSizeIdx: plz4.BlockIdx4MB, + } + + cf := opts.NewCompressorFactory() + cmp := cf.NewCompressor() + + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + dstBlk, err := blk.CompressToBlk(src, cmp, descriptor.BlockIdx4MBSz, false, nil) + if err != nil { + b.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) + } + blk.ReturnBlk(dstBlk) + + } + + // b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") }) } } diff --git a/internal/test/rd_test.go b/internal/test/rd_test.go index baf2da0..8b2f917 100644 --- a/internal/test/rd_test.go +++ b/internal/test/rd_test.go @@ -150,6 +150,10 @@ func TestContentSizeValidate(t *testing.T) { "one_ok": { data: ploadSzOne, }, + "one_ok_max_parallel": { + opts: []plz4.OptT{plz4.WithParallel(16)}, + data: ploadSzOne, + }, "zero_fail": { data: ploadSzOneWithZeroCnt, err: plz4.ErrContentSize, diff --git a/internal/test/wr_bench_test.go b/internal/test/wr_bench_test.go index 32efcf1..ee65d2c 100644 --- a/internal/test/wr_bench_test.go +++ b/internal/test/wr_bench_test.go @@ -5,6 +5,7 @@ import ( "io" "testing" + "github.com/pierrec/lz4/v4" "github.com/prequel-dev/plz4" ) @@ -171,7 +172,7 @@ func benchmarkWrite(b *testing.B, sz int, opts ...Option) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - sz := discardWrite(b, lsrc, opts...) + sz := discardWrite(b, lsrc, false, opts...) b.ReportMetric(float64(sz)/float64(len(lsrc))*100.0, "ratio") } } @@ -214,3 +215,133 @@ func benchmarkReadFromWithSrc(b *testing.B, src []byte, opts ...Option) { b.ReportMetric(float64(sz)/float64(len(src))*100.0, "ratio") } } + +func BenchmarkCompressVersusLz4AsyncSmall(b *testing.B) { + + sample, _ := LoadSample(b, Monster) + src := sample[:4<<10] + b.ReportAllocs() + b.ResetTimer() + + b.Run("lz4", func(b *testing.B) { + for b.Loop() { + benchLz4Compress(b, src, lz4.ConcurrencyOption(8)) + } + }) + + b.Run("plz4", func(b *testing.B) { + for b.Loop() { + benchPlz4Compress(b, src, plz4.WithParallel(8)) + } + }) +} + +func BenchmarkCompressVersusLz4SyncSmall(b *testing.B) { + + sample, _ := LoadSample(b, Monster) + src := sample[:4<<10] + + b.ReportAllocs() + b.ResetTimer() + + b.Run("lz4", func(b *testing.B) { + for b.Loop() { + benchLz4Compress(b, src, lz4.ConcurrencyOption(1)) + } + }) + + b.Run("plz4", func(b *testing.B) { + for b.Loop() { + benchPlz4Compress(b, src, plz4.WithParallel(0)) + } + }) +} + +func BenchmarkCompressVersusLz4AsyncMedium(b *testing.B) { + + sample, _ := LoadSample(b, Monster) + src := sample[:12<<20] + + b.ReportAllocs() + b.ResetTimer() + + b.Run("lz4", func(b *testing.B) { + for b.Loop() { + benchLz4Compress(b, src, lz4.ConcurrencyOption(8)) + } + }) + + b.Run("plz4", func(b *testing.B) { + for b.Loop() { + benchPlz4Compress(b, src, plz4.WithParallel(8)) + } + }) +} + +func BenchmarkCompressVersusLz4SyncLarge(b *testing.B) { + + src, _ := LoadSample(b, Monster) + + b.ReportAllocs() + b.ResetTimer() + + b.Run("lz4", func(b *testing.B) { + for b.Loop() { + benchLz4Compress(b, src, lz4.ConcurrencyOption(1)) + } + }) + + b.Run("plz4", func(b *testing.B) { + for b.Loop() { + benchPlz4Compress(b, src, plz4.WithParallel(0)) + } + }) +} + +func BenchmarkCompressVersusLz4AsyncLarge(b *testing.B) { + + src, _ := LoadSample(b, Monster) + + b.ReportAllocs() + b.ResetTimer() + + b.Run("lz4", func(b *testing.B) { + for b.Loop() { + benchLz4Compress(b, src, lz4.ConcurrencyOption(-1)) + } + }) + + b.Run("plz4", func(b *testing.B) { + for b.Loop() { + benchPlz4Compress(b, src, plz4.WithParallel(-1)) + } + }) +} + +func benchLz4Compress(b *testing.B, src []byte, opts ...lz4.Option) { + wr := lz4.NewWriter(io.Discard) + if err := wr.Apply(opts...); err != nil { + b.Fatalf("plz4 apply error: %v", err) + } + _, err := wr.Write(src) + if err != nil { + b.Fatalf("plz4 write error: %v", err) + } + if err := wr.Close(); err != nil { + b.Fatalf("plz4 close error: %v", err) + } +} + +func benchPlz4Compress(b *testing.B, src []byte, opts ...plz4.OptT) { + wr := plz4.NewWriter( + io.Discard, + opts..., + ) + _, err := wr.Write(src) + if err != nil { + b.Fatalf("plz4 write error: %v", err) + } + if err := wr.Close(); err != nil { + b.Fatalf("plz4 close error: %v", err) + } +} diff --git a/internal/test/wr_test.go b/internal/test/wr_test.go index 2a62595..a488df6 100644 --- a/internal/test/wr_test.go +++ b/internal/test/wr_test.go @@ -11,9 +11,9 @@ import ( "testing" "time" - "github.com/gammazero/workerpool" "github.com/prequel-dev/plz4" "github.com/prequel-dev/plz4/internal/pkg/blk" + "github.com/prequel-dev/plz4/internal/pkg/wpool" ) var cgoEnabled = true @@ -462,19 +462,181 @@ func TestLz4WriterDictID(t *testing.T) { ) } +// Ensure that a user-provided dictionary is applied when compressing in +// linked-block mode, for both synchronous and asynchronous writers. +// We verify this by compressing with a known dictionary and then +// confirming that decompression with a bad dictionary either fails with +// a corrupted error or produces mismatched data, while decompression +// with the correct dictionary round-trips. +func TestLz4WriterLinkedDictionaryApplied(t *testing.T) { + maybeSkip(t) + defer testBorrowed(t) + + // Source payload large enough to span multiple 64KiB blocks so that + // linked-block behavior and inter-block dictionaries are exercised. + src := make([]byte, 256<<10) + for i := range src { + // Deterministic, pseudo-random pattern to avoid trivially + // compressible data while still overlapping with the dictionary. + src[i] = byte(i % 251) + } + + // Use a prefix of src as the "good" dictionary so that there is + // meaningful overlap between the dictionary and the payload. + dictSz := 2 << 10 // 2 KiB prefix + if dictSz > len(src) { + t.Fatalf("dictionary size %d larger than src %d", dictSz, len(src)) + } + goodDict := append([]byte(nil), src[:dictSz]...) + + parallels := []int{0, 4} // 0 = sync writer, 4 = async/parallel writer + + for _, nParallel := range parallels { + t.Run(fmt.Sprintf("parallel_%d", nParallel), func(t *testing.T) { + + var buf bytes.Buffer + wr := plz4.NewWriter(&buf, + plz4.WithDictionary(goodDict), + plz4.WithBlockLinked(true), + plz4.WithBlockSize(plz4.BlockIdx64KB), + plz4.WithParallel(nParallel), + ) + + // Stream the payload in smaller chunks to exercise true + // streaming behavior across multiple linked blocks. + const chunkSz = 8 << 10 + if sum, err := spinWrite(wr, src, chunkSz, false); err != nil || sum != int64(len(src)) { + t.Fatalf("Fail write: sum=%d len=%d err=%v", sum, len(src), err) + } + + if err := wr.Close(); err != nil { + t.Fatalf("Fail close: %v", err) + } + + cmp := buf.Bytes() + + // Verify that decompression with the correct dictionary preserves + // the original payload. + decGood := decompressWriteTo(t, cmp, []Option{plz4.WithDictionary(goodDict)}) + if !bytes.Equal(src, decGood.Bytes()) { + t.Fatalf("round-trip with good dictionary mismatch: got %d bytes, want %d", decGood.Len(), len(src)) + } + + // Now perturb the dictionary and ensure that decompression with + // this bad dictionary does not successfully round-trip. This + // demonstrates that the compressed stream actually depends on the + // user-provided dictionary in linked mode. + badDict := append([]byte(nil), goodDict...) + for i := 0; i < len(badDict); i += 16 { + badDict[i] ^= 0xFF + } + + rd := plz4.NewReader(bytes.NewReader(cmp), plz4.WithDictionary(badDict)) + var out bytes.Buffer + n64, err := rd.WriteTo(&out) + if err == nil && bytes.Equal(src, out.Bytes()) { + t.Fatalf("decompression with bad dictionary unexpectedly matched original data (n=%d)", n64) + } + if err != nil && !plz4.Lz4Corrupted(err) { + t.Fatalf("expected corrupted error with bad dictionary, got: %v", err) + } + if cerr := rd.Close(); cerr != nil && !plz4.Lz4Corrupted(cerr) { + t.Fatalf("expected corrupted error or nil on close with bad dictionary, got: %v", cerr) + } + }) + } +} + +// Ensure that a user-provided dictionary is actually applied when +// compressing data smaller than a single block. We verify this by +// compressing with a known dictionary and then confirming that +// decompression with a bad dictionary either fails with a corrupted +// error or produces mismatched data, while decompression with the +// correct dictionary round-trips. +func TestLz4WriterDictionaryAppliedForSmallPayload(t *testing.T) { + maybeSkip(t) + defer testBorrowed(t) + + // Source payload smaller than one 64KiB block. + src := make([]byte, 4<<10) + for i := range src { + // Fill with a deterministic, pseudo-random pattern to avoid + // trivially compressible data while still having overlap with + // the dictionary below. + src[i] = byte(i % 251) + } + + // Use a prefix of src as the "good" dictionary so that there is + // meaningful overlap between the dictionary and the payload. + dictSz := 2 << 10 // 2 KiB prefix + if dictSz > len(src) { + t.Fatalf("dictionary size %d larger than src %d", dictSz, len(src)) + } + goodDict := append([]byte(nil), src[:dictSz]...) + + // Compress using a writer configured with the user dictionary and a + // block size larger than the payload to ensure we stay within a + // single block. + var buf bytes.Buffer + wr := plz4.NewWriter(&buf, + plz4.WithDictionary(goodDict), + plz4.WithBlockSize(plz4.BlockIdx64KB), + plz4.WithParallel(4), + ) + + if n, err := wr.Write(src); err != nil || n != len(src) { + t.Fatalf("Fail write: n=%d err=%v", n, err) + } + + if err := wr.Close(); err != nil { + t.Fatalf("Fail close: %v", err) + } + + cmp := buf.Bytes() + + // Verify that decompression with the correct dictionary preserves + // the original payload. + decGood := decompressWriteTo(t, cmp, []Option{plz4.WithDictionary(goodDict)}) + if !bytes.Equal(src, decGood.Bytes()) { + t.Fatalf("round-trip with good dictionary mismatch: got %d bytes, want %d", decGood.Len(), len(src)) + } + + // Now perturb the dictionary and ensure that decompression with this + // bad dictionary does not successfully round-trip. This demonstrates + // that the compressed stream actually depends on the user-provided + // dictionary, even though the payload is smaller than a single block. + badDict := append([]byte(nil), goodDict...) + for i := 0; i < len(badDict); i += 16 { + badDict[i] ^= 0xFF + } + + rd := plz4.NewReader(bytes.NewReader(cmp), plz4.WithDictionary(badDict)) + var out bytes.Buffer + n64, err := rd.WriteTo(&out) + if err == nil && bytes.Equal(src, out.Bytes()) { + t.Fatalf("decompression with bad dictionary unexpectedly matched original data (n=%d)", n64) + } + if err != nil && !plz4.Lz4Corrupted(err) { + t.Fatalf("expected corrupted error with bad dictionary, got: %v", err) + } + if cerr := rd.Close(); cerr != nil && !plz4.Lz4Corrupted(cerr) { + t.Fatalf("expected corrupted error or nil on close with bad dictionary, got: %v", cerr) + } +} + func _testLz4WriterWorkerPool(t *testing.T, N, poolSz int) { // Create N requests attached to worker pool, they should all finish. var ( wg sync.WaitGroup - wp = workerpool.New(poolSz) + wp = wpool.NewWorkerPool(wpool.WithMaxWorkers(poolSz)) lsrc, _ = LoadSample(t, LargeUncompressed) ) wg.Add(N) - for i := 0; i < N; i++ { + for range N { go func() { defer wg.Done() - discardWrite(t, lsrc, plz4.WithWorkerPool(wp), plz4.WithParallel(-1)) + discardWrite(t, lsrc, false, plz4.WithWorkerPool(wp), plz4.WithParallel(-1)) }() } @@ -488,10 +650,10 @@ func TestLz4WriterWorkerpool(t *testing.T) { } // Add a minimal worker pool; should finish -// despite only one worker available. +// despite only two workers available. func TestLz4WriterWorkerpoolMinimal(t *testing.T) { defer testBorrowed(t) - _testLz4WriterWorkerPool(t, 10, 1) + _testLz4WriterWorkerPool(t, 10, 2) } // Randomly swap between Write and ReadFrom APIs, varying write size. @@ -762,7 +924,7 @@ func TestWriteFail(t *testing.T) { _, err = wr.ReadFrom(rd) } else { chunkSz := rand.IntN(8<<20) + 1 - _, err = spinWrite(wr, lsrc, chunkSz) + _, err = spinWrite(wr, lsrc, chunkSz, true) } // Force a flush if error did not come through yet @@ -870,7 +1032,7 @@ func TestWriteFailReader(t *testing.T) { // Helpers ////////// -func spinWrite(wr plz4.Writer, src []byte, chunkSz int) (sum int64, err error) { +func spinWrite(wr plz4.Writer, src []byte, chunkSz int, randFlush bool) (sum int64, err error) { LOOP: for len(src) > 0 { @@ -886,7 +1048,7 @@ LOOP: } // Randomly call flush to exercise that code path - if rand.IntN(5) == 0 { + if randFlush && rand.IntN(5) == 0 { if err = wr.Flush(); err != nil { break LOOP } @@ -907,7 +1069,7 @@ func compressWrite(t testing.TB, src []byte, opts []Option, chunkSz int) bytes.B var ( ssrc = len(src) ) - sum, err := spinWrite(wr, src, chunkSz) + sum, err := spinWrite(wr, src, chunkSz, false) if err != nil { t.Fatalf("Fail Lz4FrameW.Write(): %v", err) } @@ -1011,13 +1173,13 @@ func (c *wrCounter) Write(data []byte) (int, error) { return len(data), nil } -func discardWrite(b testing.TB, src []byte, opts ...Option) int64 { +func discardWrite(b testing.TB, src []byte, randFlush bool, opts ...Option) int64 { const chunkSz = 4 << 10 wrCnt := wrCounter{} wr := plz4.NewWriter(&wrCnt, opts...) - _, err := spinWrite(wr, src, chunkSz) + _, err := spinWrite(wr, src, chunkSz, randFlush) if err != nil { b.Errorf("Fail Lz4FrameW.Write(): %v", err) }