Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions cmd/plz4/internal/ops/bakeoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"fmt"
"io"
"os"
"runtime"
"slices"
"time"

"github.com/gammazero/workerpool"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/pierrec/lz4/v4"
Expand Down Expand Up @@ -188,13 +187,11 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
return nil, err
}

var (
i = 0
)
const nLevels = 10

tr := &progress.Tracker{
Message: "Processing lz4",
Total: srcSz * 10,
Total: srcSz * nLevels,
Units: progress.UnitsBytes,
}

Expand Down Expand Up @@ -236,7 +233,7 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err

var results []resultT

for ; i < 10; i++ {
for i := range nLevels {
start := time.Now()

var (
Expand All @@ -245,7 +242,8 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
err error
)

lvl, err := lz4Level(i)
// Run backwards so that the cache penalty is less for faster levels.
lvl, err := lz4Level(nLevels - i - 1) // [0,nLevels-1]
if err != nil {
return nil, err
}
Expand All @@ -260,10 +258,9 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
return nil, err
}

// Last one wins; so append is ok.
opts = append(opts, lz4.CompressionLevelOption(lvl))
nopts := append(opts, lz4.CompressionLevelOption(lvl))

split, cnt, err = lz4BakeOne(rd, opts...)
split, cnt, err = lz4BakeOne(rd, nopts...)
}

if err != nil {
Expand All @@ -282,6 +279,7 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err
})
}

slices.Reverse(results)
return results, nil
}

Expand Down Expand Up @@ -347,9 +345,11 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
return nil, err
}

const nLevels = 12

tr := &progress.Tracker{
Message: "Processing plz4",
Total: srcSz * 12,
Total: srcSz * nLevels,
Units: progress.UnitsBytes,
}

Expand All @@ -364,12 +364,9 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
tr.SetValue(srcOff + (int64(i) * srcSz))
}

wp := workerpool.New(runtime.NumCPU())

opts = append(opts,
plz4.WithProgress(cbHandler),
plz4.WithPendingSize(-1),
plz4.WithWorkerPool(wp),
)

var srcBlock []byte
Expand All @@ -385,7 +382,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er

var results []resultT

for ; i < 12; i++ {
for ; i < nLevels; i++ {
start := time.Now()

var (
Expand All @@ -394,21 +391,22 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
err error
)

lvl := nLevels - i // [1,nLevels]

if srcBlock != nil {
// Block mode
split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(i+1))
split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(lvl))

} else {
// Last one wins; so append is ok.

if _, err := rd.Seek(0, io.SeekStart); err != nil {
return nil, err
}

opts = append(opts,
plz4.WithLevel(plz4.LevelT(i+1)),
// Run backwards so that the cache penalty is less for faster levels.
nopts := append(opts,
plz4.WithLevel(plz4.LevelT(lvl)),
)
split, cnt, err = plz4BakeOne(rd, opts...)
split, cnt, err = plz4BakeOne(rd, nopts...)
}
if err != nil {
return nil, err
Expand All @@ -426,6 +424,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er
})
}

slices.Reverse(results)
return results, nil
}

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ go 1.24.0

require (
github.com/alecthomas/kong v1.13.0
github.com/gammazero/workerpool v1.1.3
github.com/jedib0t/go-pretty/v6 v6.7.8
github.com/pierrec/lz4/v4 v4.1.25
)

require (
github.com/clipperhouse/stringish v0.1.1 // indirect
github.com/clipperhouse/uax29/v2 v2.3.0 // indirect
github.com/gammazero/deque v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.19 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/term v0.38.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ github.com/clipperhouse/uax29/v2 v2.3.0 h1:SNdx9DVUqMoBuBoW3iLOj4FQv3dN5mDtuqwuh
github.com/clipperhouse/uax29/v2 v2.3.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gammazero/deque v1.2.0 h1:scEFO8Uidhw6KDU5qg1HA5fYwM0+us2qdeJqm43bitU=
github.com/gammazero/deque v1.2.0/go.mod h1:JVrR+Bj1NMQbPnYclvDlvSX0nVGReLrQZ0aUMuWLctg=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/jedib0t/go-pretty/v6 v6.7.8 h1:BVYrDy5DPBA3Qn9ICT+PokP9cvCv1KaHv2i+Hc8sr5o=
Expand All @@ -26,8 +22,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q=
Expand Down
18 changes: 9 additions & 9 deletions internal/pkg/async/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func NewAsyncReader(rdr io.Reader, hdr header.HeaderT, opts *opts.OptsT) *asyncR
// Otherwise could dead lock on too many simultaneous request
go r.dispatch()

// Each closure escapes and causes an allocate.
// No reason to do that NParallel times
task := func() {
r.decompress()
nTasks := opts.NParallel
if hdr.Flags.ContentSize() && hdr.ContentSz > 0 {
// If content size known, can limit number of tasks
nTasks = min(opts.NParallel, int(hdr.ContentSz)/bsz+1)
}

r.wg.Add(opts.NParallel)
for i := 0; i < opts.NParallel; i++ {
opts.WorkerPool.Submit(task)
r.wg.Add(nTasks)
for range nTasks {
opts.WorkerPool.Submit(r.decompress)
}

return r
Expand Down Expand Up @@ -223,8 +223,8 @@ LOOP:
func (r *asyncRdrT) NextBlock(prevBlk *blk.BlkT) (*blk.BlkT, int, error) {

if prevBlk != nil {
switch {
case r.hasher == nil:
switch r.hasher {
case nil:
blk.ReturnBlk(prevBlk)
default:
r.hasher.Queue(prevBlk)
Expand Down
Loading
Loading