diff --git a/README.md b/README.md index 83a3ca1..2851638 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ This project attempts to support all of the features enumerated in the [LZ4 Fram - [Sparse](./pkg/sparse) write support - Random read access (see [caveats](#random-read-access)) - +While the primary purpose of plz4 is to support parallel processing, the raw block APIs have also been supported for cases where the payloads are very small and do not benefit from LZ4 Framing. ## Design @@ -57,6 +57,11 @@ There is another LZ4 Frame feature that is problematic at scale. By default, pl Another advantage of independent blocks is the potential to support random read access. This is possible because each block can be independently decompressed. To support this, plz4 provides an optional progress callback that emits both the source offset and corresponding block offset during compression. An implementation can use this information to build lookup tables that can later be used to skip ahead during decompression to a known block offset. plz4 provides the 'WithReadOffset' option on the NewReader API to skip ahead and start decompression at a known block offset. +### CGO + + +This package uses CGO to call the canonical LZ4 library which is written in C. There may be cases where CGO is not desired, and in those cases the package also supports building with the environment variable "CGO_ENABLED=0". In general, the library runs a bit slower in that mode and not all features are available. + ## Install diff --git a/cmd/plz4/internal/ops/bakeoff.go b/cmd/plz4/internal/ops/bakeoff.go index a6c9013..fff21cf 100644 --- a/cmd/plz4/internal/ops/bakeoff.go +++ b/cmd/plz4/internal/ops/bakeoff.go @@ -31,7 +31,7 @@ func RunBakeoff() error { ) // Consume into RAM; must be able to seek - if rdr == os.Stdin { + if rdr == os.Stdin || CLI.Bakeoff.RAM { var buf bytes.Buffer n, err := io.Copy(&buf, rdr) if err != nil { @@ -125,16 +125,22 @@ func outputOptions() error { dict = CLI.Dict } - t.AppendRows([]table.Row{ - {"File name", fn}, - {"Dictionary", dict}, - {"Concurrency", CLI.Cpus}, - {"Block Size", CLI.Bakeoff.BS}, - {"Block Checksum", CLI.Bakeoff.BX}, - {"Blocks Linked", CLI.Bakeoff.BD}, - {"Content Checksum", CLI.Bakeoff.CS}, - {"Content Size", CLI.Bakeoff.CX}, - }) + if CLI.Bakeoff.BlockMode { + t.AppendRows([]table.Row{ + {"File name", fn}, + }) + } else { + t.AppendRows([]table.Row{ + {"File name", fn}, + {"Dictionary", dict}, + {"Concurrency", CLI.Cpus}, + {"Block Size", CLI.Bakeoff.BS}, + {"Block Checksum", CLI.Bakeoff.BX}, + {"Blocks Linked", CLI.Bakeoff.BD}, + {"Content Checksum", CLI.Bakeoff.CX}, + {"Content Size", CLI.Bakeoff.CS}, + }) + } t.Render() return nil @@ -143,8 +149,13 @@ func outputOptions() error { func outputResults(srcSz int64, plz4Results, lz4Results []resultT) error { fmt.Println() + mode := "frame mode" + if CLI.Bakeoff.BlockMode { + mode = "block mode" + } + t := table.NewWriter() - t.SetTitle("Bakeoff Results") + t.SetTitle(fmt.Sprintf("Bakeoff Results [%s]", mode)) t.SetStyle(table.StyleColoredBright) t.SetOutputMirror(os.Stdout) t.AppendHeader(table.Row{"Algo", "Level", "SrcSize", "Compressed", "Ratio", "Compress", "Decompress"}) @@ -208,6 +219,18 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err opts = append(opts, lz4.OnBlockDoneOption(cbHandler)) + var srcBlock []byte + if CLI.Bakeoff.BlockMode { + srcBlock, err = io.ReadAll(rd) + if err != nil { + return nil, err + } + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + } + bakeFunc := func() ([]resultT, error) { defer tr.MarkAsDone() @@ -216,19 +239,33 @@ func _prepLz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, err for ; i < 10; i++ { start := time.Now() - if _, err := rd.Seek(0, io.SeekStart); err != nil { - return nil, err - } + var ( + split time.Time + cnt int64 + err error + ) - // Last one wins; so append is ok. lvl, err := lz4Level(i) if err != nil { return nil, err } - opts = append(opts, lz4.CompressionLevelOption(lvl)) + if srcBlock != nil { + // Block mode + split, cnt, err = lz4BakeOneBlock(srcBlock, lvl) + + } else { + + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + // Last one wins; so append is ok. + opts = append(opts, lz4.CompressionLevelOption(lvl)) + + split, cnt, err = lz4BakeOne(rd, opts...) + } - split, cnt, err := lz4BakeOne(rd, opts...) if err != nil { return nil, err } @@ -335,21 +372,44 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er plz4.WithWorkerPool(wp), ) + var srcBlock []byte + if CLI.Bakeoff.BlockMode { + srcBlock, err = io.ReadAll(rd) + if err != nil { + return nil, err + } + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return nil, err + } + } + var results []resultT for ; i < 12; i++ { start := time.Now() - if _, err := rd.Seek(0, io.SeekStart); err != nil { - return nil, err - } - - // Last one wins; so append is ok. - opts = append(opts, - plz4.WithLevel(plz4.LevelT(i+1)), + var ( + split time.Time + cnt int64 + err error ) - split, cnt, err := plz4BakeOne(rd, opts...) + if srcBlock != nil { + // Block mode + split, cnt, err = plz4BakeOneBlock(srcBlock, plz4.LevelT(i+1)) + + } else { + // Last one wins; so append is ok. + + if _, err := rd.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + opts = append(opts, + plz4.WithLevel(plz4.LevelT(i+1)), + ) + split, cnt, err = plz4BakeOne(rd, opts...) + } if err != nil { return nil, err } @@ -373,6 +433,7 @@ func _prepPlz4(rd io.ReadSeeker, srcSz int64, pw progress.Writer) (bakeFuncT, er } func plz4BakeOne(src io.Reader, opts ...plz4.OptT) (split time.Time, cnt int64, err error) { + var ( fh *os.File wr io.Writer @@ -445,6 +506,46 @@ func _plz4Decompress(rd io.Reader) error { return err } +func lz4BakeOneBlock(src []byte, level lz4.CompressionLevel) (split time.Time, cnt int64, err error) { + + var ( + sz = lz4.CompressBlockBound(len(src)) + dst = make([]byte, sz) + n int + ) + + if level == lz4.Fast { + n, err = lz4.CompressBlock(src, dst, nil) + } else { + n, err = lz4.CompressBlockHC(src, dst, level, nil, nil) + } + if err != nil { + return + } + + dst = dst[:n] + split = time.Now() + cnt = int64(n) + + tmp := make([]byte, len(src)) + + _, err = lz4.UncompressBlock(dst, tmp) + return +} + +func plz4BakeOneBlock(src []byte, level plz4.LevelT) (split time.Time, cnt int64, err error) { + + dst, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(level)) + if err != nil { + return + } + + split = time.Now() + _, err = plz4.DecompressBlock(dst) + cnt = int64(len(dst)) + return +} + func _lz4Decompress(rd io.Reader) error { frd := lz4.NewReader(rd) diff --git a/cmd/plz4/internal/ops/cli.go b/cmd/plz4/internal/ops/cli.go index ffd70cd..4998549 100644 --- a/cmd/plz4/internal/ops/cli.go +++ b/cmd/plz4/internal/ops/cli.go @@ -25,13 +25,14 @@ var CLI struct { Skip bool `help:"Skip decompress" short:"s"` } `cmd:"" aliases:"v,ver" help:"Verify lz4 data"` Bakeoff struct { - File string `optional:"" arg:"" type:"existingfile"` - BS string `help:"Block size [4MB, 1MB, 256KB, 64KB]" default:"4MB"` - BD bool `help:"Enable linked blocks"` - BX bool `help:"Enable block checksum"` - CX bool `help:"Enable content checksum"` - CS bool `help:"Enable content size; fails on stdin"` - RAM bool `help:"Process data in RAM"` + File string `optional:"" arg:"" type:"existingfile"` + BS string `help:"Block size [4MB, 1MB, 256KB, 64KB]" default:"4MB"` + BD bool `help:"Enable linked blocks"` + BX bool `help:"Enable block checksum"` + CX bool `help:"Enable content checksum"` + CS bool `help:"Enable content size; fails on stdin"` + RAM bool `help:"Process data in RAM"` + BlockMode bool `help:"Use block API instead of frame API" short:"B"` } `cmd:"" aliases:"b,bake" help:"Compare performance to github.com/pierrec/lz4"` Cpus int `help:"Concurrency [0 synchronous] [-1 auto]" default:"-1" short:"c"` diff --git a/internal/pkg/clz4/clz4.go b/internal/pkg/clz4/clz4.go index 8a9cf2d..18b6f10 100644 --- a/internal/pkg/clz4/clz4.go +++ b/internal/pkg/clz4/clz4.go @@ -24,6 +24,10 @@ func byteSliceToCharPointer(b []byte) *C.char { return (*C.char)(unsafe.Pointer(&b[0])) } +func CompressBound(sz int) int { + return int(C.LZ4_compressBound(C.int(sz))) +} + func CompressFast(source, dest []byte, acceleration int) (int, error) { ret := int(C.LZ4_compress_fast( byteSliceToCharPointer(source), diff --git a/internal/pkg/compress/compress.go b/internal/pkg/compress/compress.go index 6474d4b..aba8f92 100644 --- a/internal/pkg/compress/compress.go +++ b/internal/pkg/compress/compress.go @@ -68,10 +68,14 @@ func (f CompressorFactory) newIndie() Compressor { } func (f CompressorFactory) newLinked() Compressor { - switch { - case f.level == 1: + switch f.level { + case 1: return newLinkedCompressor(f.dictCtx) default: return newLinkedCompressorHC(f.level, f.dictCtxHC) } } + +func CompressBound(sz int) int { + return clz4.CompressBound(sz) +} diff --git a/internal/pkg/compress/nocgo_compress.go b/internal/pkg/compress/nocgo_compress.go index 5402174..859c07d 100644 --- a/internal/pkg/compress/nocgo_compress.go +++ b/internal/pkg/compress/nocgo_compress.go @@ -47,11 +47,10 @@ func (f CompressorFactory) NewCompressor() Compressor { } type fastCompressor struct { - cmp lz4.Compressor } func (c *fastCompressor) Compress(src, dst, dict []byte) (int, error) { - return c.cmp.CompressBlock(src, dst) + return lz4.CompressBlock(src, dst, nil) } func NewCompressorHC(level int) Compressor { @@ -59,18 +58,16 @@ func NewCompressorHC(level int) Compressor { level = 9 } return &hcCompressor{ - cmp: lz4.CompressorHC{ - Level: lz4Level(level), - }, + level: lz4Level(level), } } type hcCompressor struct { - cmp lz4.CompressorHC + level lz4.CompressionLevel } func (c *hcCompressor) Compress(src, dst, dict []byte) (int, error) { - return c.cmp.CompressBlock(src, dst) + return lz4.CompressBlockHC(src, dst, c.level, nil, nil) } type failedCompressor struct { @@ -110,3 +107,7 @@ func lz4Level(l int) lz4.CompressionLevel { } return lz4Level } + +func CompressBound(sz int) int { + return lz4.CompressBlockBound(sz) +} diff --git a/internal/test/block_test.go b/internal/test/block_test.go new file mode 100644 index 0000000..b8324f5 --- /dev/null +++ b/internal/test/block_test.go @@ -0,0 +1,399 @@ +package test + +import ( + "bytes" + "crypto/rand" + "fmt" + "testing" + + "github.com/pierrec/lz4/v4" + "github.com/prequel-dev/plz4" +) + +// Basic round-trip tests for CompressBlock/DecompressBlock. +func TestCompressDecompressBlockBasic(t *testing.T) { + defer testBorrowed(t) + + randBuf := make([]byte, 4<<10) + if _, err := rand.Read(randBuf); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + + tests := map[string][]byte{ + "nil": nil, + "empty": {}, + "small": []byte("hello world"), + "random_4k": randBuf, + } + + for name, src := range tests { + name, src := name, src + t.Run(name, func(t *testing.T) { + cmp, err := plz4.CompressBlock(src) + if err != nil { + t.Fatalf("CompressBlock failed: %v", err) + } + + // Ensure compressed size never exceeds bound. + if len(cmp) > plz4.CompressBlockBound(len(src)) { + t.Fatalf("compressed size %d exceeds bound %d", len(cmp), plz4.CompressBlockBound(len(src))) + } + + dec, err := plz4.DecompressBlock(cmp) + if err != nil { + t.Fatalf("DecompressBlock failed: %v", err) + } + + if !bytes.Equal(src, dec) { + t.Fatalf("round-trip mismatch: got %d bytes, want %d", len(dec), len(src)) + } + }) + } +} + +// Verify that compression level option does not break round-trip. +func TestCompressDecompressBlockWithLevel(t *testing.T) { + defer testBorrowed(t) + + src := make([]byte, 4<<10) + if _, err := rand.Read(src); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + levels := []plz4.LevelT{plz4.Level1, plz4.Level3, plz4.Level6, plz4.Level9} + + for _, lvl := range levels { + lvl := lvl + t.Run(fmt.Sprintf("level_%d", lvl), func(t *testing.T) { + cmp, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(lvl)) + if err != nil { + t.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) + } + + dec, err := plz4.DecompressBlock(cmp) + if err != nil { + t.Fatalf("DecompressBlock(level=%d) failed: %v", lvl, err) + } + + if !bytes.Equal(src, dec) { + t.Fatalf("round-trip mismatch at level %d", lvl) + } + }) + } +} + +// Verify that providing a dictionary option is accepted and preserves round-trip. +func TestCompressDecompressBlockWithDictionary(t *testing.T) { + maybeSkip(t) + + defer testBorrowed(t) + + src := make([]byte, 4<<10) + if _, err := rand.Read(src); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + // Use a simple dictionary; only last 64KiB is used internally. + dict := make([]byte, 8<<10) + if _, err := rand.Read(dict); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + + cmp, err := plz4.CompressBlock(src, plz4.WithBlockDictionary(dict)) + if err != nil { + t.Fatalf("CompressBlock with dict failed: %v", err) + } + + dec, err := plz4.DecompressBlock(cmp, plz4.WithBlockDictionary(dict)) + if err != nil { + t.Fatalf("DecompressBlock with dict failed: %v", err) + } + + if !bytes.Equal(src, dec) { + t.Fatalf("round-trip mismatch with dict: got %d bytes, want %d", len(dec), len(src)) + } +} + +// Verify that providing an incorrect dictionary on decompress either +// fails with a corrupted error or produces mismatched data. +func TestDecompressBlockWithBadDictionary(t *testing.T) { + maybeSkip(t) + + defer testBorrowed(t) + + // Create source and two different dictionaries derived from it so that + // the dictionary actually has useful overlap with the data. + src := make([]byte, 4<<10) + if _, err := rand.Read(src); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + + // Use a prefix of src as the "good" dictionary. + dictSz := 2 << 10 // 2 KiB prefix + goodDict := append([]byte(nil), src[:dictSz]...) + + // Start from the same prefix but perturb bytes to create a "bad" dict + // that still has overlap but will not match exactly. + badDict := append([]byte(nil), src[:dictSz]...) + for i := 0; i < len(badDict); i += 16 { + badDict[i] ^= 0xFF + } + + // Compress with good dictionary. + cmp, err := plz4.CompressBlock(src, plz4.WithBlockDictionary(goodDict)) + if err != nil { + t.Fatalf("CompressBlock with good dict failed: %v", err) + } + + // Decompress with a different (bad) dictionary. + dec, err := plz4.DecompressBlock(cmp, plz4.WithBlockDictionary(badDict)) + if err != nil { + // When an error is returned, it should be marked corrupted. + if !plz4.Lz4Corrupted(err) { + t.Fatalf("expected corrupted error with bad dictionary, got: %v", err) + } + return + } + + if bytes.Equal(src, dec) { + t.Fatalf("decompression with bad dictionary unexpectedly matched original data") + } + + // Validate that a good dictionary still works. + decGood, err := plz4.DecompressBlock(cmp, plz4.WithBlockDictionary(goodDict)) + if err != nil { + t.Fatalf("DecompressBlock with good dict failed: %v", err) + } + + if !bytes.Equal(src, decGood) { + t.Fatalf("decompression with good dictionary produced mismatched data") + } +} + +// Ensure DecompressBlock reports corruption on clearly invalid input. +func TestDecompressBlockCorruptedInput(t *testing.T) { + maybeSkip(t) + + defer testBorrowed(t) + + badInputs := [][]byte{ + {}, + []byte("not-a-valid-lz4-block"), + bytes.Repeat([]byte{0xFF}, 64), + } + + for i, src := range badInputs { + src := src + t.Run(fmt.Sprintf("case_%d_len_%d", i, len(src)), func(t *testing.T) { + _, err := plz4.DecompressBlock(src) + if err == nil { + t.Fatalf("expected error for corrupted input, got nil") + } + + // Corrupted block errors should be tagged as such. + if !plz4.Lz4Corrupted(err) { + t.Fatalf("expected corrupted error, got: %v", err) + } + }) + } +} + +// Sanity check for CompressBlockBound growth behavior. +func TestCompressBlockBoundMonotonic(t *testing.T) { + // This does not touch block pools but keep behavior consistent. + defer testBorrowed(t) + + prev := 0 + for sz := 0; sz <= 1<<20; sz += 4096 { + b := plz4.CompressBlockBound(sz) + if b < prev { + t.Fatalf("CompressBlockBound not monotonic: size %d -> %d, previous %d", sz, b, prev) + } + if b < sz { + t.Fatalf("CompressBlockBound(%d) < size (%d)", sz, b) + } + prev = b + } +} + +// Prove interoperability between plz4 block compression and +// the Go lz4 block decompressor. +func TestBlockInteropPlz4ToGoLz4(t *testing.T) { + defer testBorrowed(t) + + sizes := []int{0, 1, 1024, 4 << 10, 64 << 10} + + for _, sz := range sizes { + sz := sz + t.Run(fmt.Sprintf("size_%d", sz), func(t *testing.T) { + // Generate source buffer. + src := make([]byte, sz) + if sz > 0 { + if _, err := rand.Read(src); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + } + + // Compress with plz4 block API. + cmp, err := plz4.CompressBlock(src) + if err != nil { + t.Fatalf("plz4.CompressBlock failed: %v", err) + } + + // Decompress with Go lz4 block API. + dst := make([]byte, len(src)) + n, err := lz4.UncompressBlock(cmp, dst) + if err != nil { + t.Fatalf("lz4.UncompressBlock failed: %v", err) + } + + if n != len(src) { + t.Fatalf("unexpected decompressed size: got %d, want %d", n, len(src)) + } + + if !bytes.Equal(src, dst[:n]) { + t.Fatalf("data mismatch after plz4->lz4 block round-trip") + } + }) + } +} + +// Prove interoperability between Go lz4 block compression and +// the plz4 block decompressor. +func TestBlockInteropGoLz4ToPlz4(t *testing.T) { + defer testBorrowed(t) + + sizes := []int{0, 1, 1024, 4 << 10, 64 << 10} + + for _, sz := range sizes { + sz := sz + t.Run(fmt.Sprintf("size_%d", sz), func(t *testing.T) { + // Generate source buffer. + src := make([]byte, sz) + if sz > 0 { + if _, err := rand.Read(src); err != nil { + t.Fatalf("rand.Read failed: %v", err) + } + } + + // Compress with Go lz4 block API. + var c lz4.Compressor + cmpBuf := make([]byte, lz4.CompressBlockBound(len(src))) + nCmp, err := c.CompressBlock(src, cmpBuf) + if err != nil { + t.Fatalf("lz4.Compressor.CompressBlock failed: %v", err) + } + cmp := cmpBuf[:nCmp] + + // Decompress with plz4 block API. + dec, err := plz4.DecompressBlock(cmp) + if err != nil { + t.Fatalf("plz4.DecompressBlock failed: %v", err) + } + + if len(dec) != len(src) { + t.Fatalf("unexpected decompressed size: got %d, want %d", len(dec), len(src)) + } + + if !bytes.Equal(src, dec) { + t.Fatalf("data mismatch after lz4->plz4 block round-trip") + } + }) + } +} + +//--- +// Benchmarks + +// BenchmarkCompressBlock measures raw block compression throughput +// for different input sizes. +func BenchmarkCompressBlock(b *testing.B) { + sizes := []int{1 << 10, 4 << 10, 64 << 10, 1 << 20} + + for _, sz := range sizes { + b.Run(fmt.Sprintf("size_%d", sz), func(b *testing.B) { + // Prepare random (mostly uncompressible) data of the given size. + src := make([]byte, sz) + if _, err := rand.Read(src); err != nil { + b.Fatalf("rand.Read failed: %v", err) + } + + dst := make([]byte, plz4.CompressBlockBound(len(src))) + + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if _, err := plz4.CompressBlock(src, plz4.WithBlockDst(dst)); err != nil { + b.Fatalf("CompressBlock failed: %v", err) + } + } + }) + } +} + +// BenchmarkDecompressBlock measures raw block decompression throughput +// matching the data produced by BenchmarkCompressBlock-sized inputs. +func BenchmarkDecompressBlock(b *testing.B) { + sizes := []int{1 << 10, 4 << 10, 64 << 10, 1 << 20} + + for _, sz := range sizes { + sz := sz + b.Run(fmt.Sprintf("size_%d", sz), func(b *testing.B) { + // Prepare sample compressed data once. + src := make([]byte, sz) + if _, err := rand.Read(src); err != nil { + b.Fatalf("rand.Read failed: %v", err) + } + + cmp, err := plz4.CompressBlock(src) + if err != nil { + b.Fatalf("CompressBlock failed while preparing sample: %v", err) + } + + dst := make([]byte, sz) + + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + dec, err := plz4.DecompressBlock(cmp, plz4.WithBlockDst(dst)) + if err != nil { + b.Fatalf("DecompressBlock failed: %v", err) + } + if len(dec) != len(src) { + b.Fatalf("unexpected decompressed size: got %d, want %d", len(dec), len(src)) + } + } + }) + } +} + +// BenchmarkCompressBlockWithLevel focuses on how compression level +// affects performance for a fixed moderately sized input. +func BenchmarkCompressBlockWithLevel(b *testing.B) { + src, _ := LoadSample(b, LargeUncompressed) + + var ( + dst = make([]byte, plz4.CompressBlockBound(len(src))) + levels = []plz4.LevelT{plz4.Level1, plz4.Level3, plz4.Level6, plz4.Level9} + ) + + for _, lvl := range levels { + lvl := lvl + b.Run(fmt.Sprintf("level_%d", lvl), func(b *testing.B) { + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + dst, err := plz4.CompressBlock(src, plz4.WithBlockCompressionLevel(lvl), plz4.WithBlockDst(dst)) + if err != nil { + b.Fatalf("CompressBlock(level=%d) failed: %v", lvl, err) + } + + b.ReportMetric(float64(len(dst))/float64(len(src))*100.0, "ratio") + } + }) + } +} diff --git a/internal/test/mem.out b/internal/test/mem.out new file mode 100644 index 0000000..4b5dffb Binary files /dev/null and b/internal/test/mem.out differ diff --git a/internal/test/test.test b/internal/test/test.test new file mode 100755 index 0000000..1ba0dcf Binary files /dev/null and b/internal/test/test.test differ diff --git a/plz4_block.go b/plz4_block.go new file mode 100644 index 0000000..810a1f5 --- /dev/null +++ b/plz4_block.go @@ -0,0 +1,147 @@ +package plz4 + +import ( + "github.com/prequel-dev/plz4/internal/pkg/compress" +) + +const ( + maxTries = 3 + initMultiple = 4 +) + +// Block options for CompressBlock and DecompressBlock. +type BlockOpt func(blockOpt) blockOpt + +type blockOpt struct { + lvl LevelT + dst []byte + dict *compress.DictT +} + +func (o blockOpt) dictData() []byte { + if o.dict == nil { + return nil + } + return o.dict.Data() +} + +// Specify write compression level [1-12]. Defaults to Level1. +// This applies only to block compression. +func WithBlockCompressionLevel(level LevelT) BlockOpt { + return func(o blockOpt) blockOpt { + // Clamp level to the valid range [Level1, Level12] to avoid invalid levels + // propagating to the underlying compressor and causing a panic. + if level < Level1 { + level = Level1 + } else if level > Level12 { + level = Level12 + } + o.lvl = level + return o + } +} + +// Specify dictionary to use for block compression/decompression. +// This applies only to block compression/decompression. +func WithBlockDictionary(dict []byte) BlockOpt { + return func(o blockOpt) blockOpt { + o.dict = compress.NewDictT(dict, false) + return o + } +} + +// Specify destination buffer for compression/decompression. +// If not provided, will allocate sufficient space. +// This applies only to block compression/decompression. +func WithBlockDst(dst []byte) BlockOpt { + return func(o blockOpt) blockOpt { + o.dst = dst + return o + } +} + +// Returns maximum compressed block size for input size sz. +func CompressBlockBound(sz int) int { + return compress.CompressBound(sz) +} + +func parseBlockOpts(opts ...BlockOpt) blockOpt { + o := blockOpt{ + lvl: Level1, + } + for _, opt := range opts { + o = opt(o) + } + return o +} + +// Compress src block given options and return compressed block. +// If dst not provided, will allocate sufficient space. +// If dst is provided as an option and that slice is too small, +// an error will be returned. +func CompressBlock(src []byte, opts ...BlockOpt) ([]byte, error) { + + var ( + o = parseBlockOpts(opts...) + f = compress.NewCompressorFactory(o.lvl, true, o.dict) + c = f.NewCompressor() + dst = o.dst + ) + + if dst == nil { + dst = make([]byte, CompressBlockBound(len(src))) + } + + n, err := c.Compress(src, dst, o.dictData()) + if err != nil { + return nil, err + } + + return dst[:n], nil +} + +// Decompress src block given options and return decompressed block. +// If dst not provided, will attempt to allocate sufficient space. +// If dst is provided as an option and that slice is too small, +// an error will be returned. +func DecompressBlock(src []byte, opts ...BlockOpt) ([]byte, error) { + var ( + o = parseBlockOpts(opts...) + dst = o.dst + ) + + d := compress.NewDecompressor(true, o.dict) + + if dst != nil { + n, err := d.Decompress(src, dst) + if err != nil { + return nil, err + } + return dst[:n], nil + } + + // No dst provided, allocate a buffer. + // Since we don't know the decompressed size, we start with + // a multiple of the src and reallocate as necessary. + // Unfortunately, the lz4 API does not distinguish between dst-too-small + // and other errors, so we must apply a heuristic in the case of an error. + var ( + nTry = 1 + bufSize = len(src) * initMultiple + ) + + for { + dst = make([]byte, bufSize) + n, err := d.Decompress(src, dst) + + switch { + case err == nil: + return dst[:n], nil + case nTry < maxTries: + nTry += 1 + bufSize *= 2 + default: + return nil, err + } + } +}