From ead44804c6b38a9cb44371062529aeeaf84be562 Mon Sep 17 00:00:00 2001 From: coanor Date: Thu, 8 Jan 2026 18:53:30 +0800 Subject: [PATCH 1/4] feat: add logging to diskcache --- diskcache/diskcache.go | 5 +++++ diskcache/get.go | 12 ++++++++++++ diskcache/open.go | 10 ++++++++++ 3 files changed, 27 insertions(+) diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 181cde2b..9fd067e2 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/GuanceCloud/cliutils/logger" ) const ( @@ -57,6 +59,9 @@ var ( // Invalid file header. ErrBadHeader = errors.New("bad header") + + l = logger.DefaultSLogger("diskcache") + once sync.Once ) // DiskCache is the representation of a disk cache. diff --git a/diskcache/get.go b/diskcache/get.go index e14ac60e..c5147ac8 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -31,6 +31,8 @@ func (c *DiskCache) skipBadFile() error { droppedDataVec.WithLabelValues(c.path, reasonBadDataFile).Observe(float64(c.curReadSize)) }() + l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize) + return c.switchNextFile() } @@ -98,6 +100,14 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { + if err != nil && err != io.EOF { + l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) + } + + if n > 0 && n != dataHeaderLen { + l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) + } + // On bad datafile, just ignore and delete the file. if err = c.skipBadFile(); err != nil { return err @@ -134,6 +144,8 @@ retry: return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) } + l.Warnf("got %d bytes to read into buffer with length %d", nbytes, len(readbuf)) + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) return ErrTooSmallReadBuf } diff --git a/diskcache/open.go b/diskcache/open.go index 06c6fd5c..2571abda 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -13,10 +13,20 @@ import ( "strconv" "sync" "time" + + "github.com/GuanceCloud/cliutils/logger" ) +func setupLogger() { + once.Do(func() { + l = logger.SLogger("diskcache") + }) +} + // Open init and create a new disk cache. We can set other options with various options. func Open(opts ...CacheOption) (*DiskCache, error) { + setupLogger() + c := defaultInstance() // apply extra options From 040795bb2100b3188f571af641b822f446401231 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 12:03:27 +0800 Subject: [PATCH 2/4] add more logs --- diskcache/diskcache.go | 3 ++- diskcache/get.go | 19 +++++++++++-------- diskcache/metric.go | 2 +- diskcache/open.go | 6 ++---- diskcache/rotate.go | 2 +- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 9fd067e2..9deb0624 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -94,7 +94,8 @@ type DiskCache struct { pos *pos // current read fd position info // specs of current diskcache - size atomic.Int64 // current byte size + size atomic.Int64 // current byte size + curBatchSize, // current writing file's size curReadSize, // current reading file's size batchSize, // current batch size(static) diff --git a/diskcache/get.go b/diskcache/get.go index c5147ac8..7f406a34 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -7,6 +7,7 @@ package diskcache import ( "encoding/binary" + "errors" "fmt" "io" "time" @@ -82,9 +83,10 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error { if err = func() error { c.wlock.Lock() defer c.wlock.Unlock() + return c.rotate() }(); err != nil { - return err + return fmt.Errorf("wakeup error: %w", err) } } @@ -100,7 +102,7 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) } @@ -140,14 +142,15 @@ retry: if len(readbuf) < nbytes { // seek to next read position - if _, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { + if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) - } + } else { + l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", + nbytes, len(readbuf), x, nbytes, c.curReadfile) - l.Warnf("got %d bytes to read into buffer with length %d", nbytes, len(readbuf)) - - droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return ErrTooSmallReadBuf + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) + return ErrTooSmallReadBuf + } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { diff --git a/diskcache/metric.go b/diskcache/metric.go index 2d7688b6..e13a192b 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -181,7 +181,7 @@ func setupMetrics() { prometheus.GaugeOpts{ Namespace: ns, Name: "size", - Help: "Current cache size that waiting to be consumed(get)", + Help: "Current cache size that waiting to be consumed(get). The size include header bytes", }, []string{"path"}, ) diff --git a/diskcache/open.go b/diskcache/open.go index 2571abda..c74d2493 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -142,10 +142,6 @@ func (c *DiskCache) doOpen() error { switch filepath.Base(path) { case ".lock", ".pos": // ignore them case "data": // not rotated writing file, do not count on sizeVec. - c.size.Add(fi.Size()) - // NOTE: c.size not always equal to sizeVec. c.size used to limit - // total bytes used for Put(), but sizeVec used to count size that - // waiting to be Get(). default: c.size.Add(fi.Size()) sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) @@ -158,6 +154,8 @@ func (c *DiskCache) doOpen() error { } sort.Strings(c.dataFiles) // make file-name sorted for FIFO Get() + l.Infof("on open loaded %d files", len(c.dataFiles)) + datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) // first get, try load .pos if !c.noPos { diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 75d1968d..5bce9065 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -73,7 +73,7 @@ func (c *DiskCache) rotate() error { return fmt.Errorf("rotate on Rename(%q, %q): %w", c.curWriteFile, newfile, err) } - // new file added, plus it's size to cache size + // new file added, add it's size to cache size if fi, err := os.Stat(newfile); err == nil { if fi.Size() > dataHeaderLen { c.size.Add(fi.Size()) From 07aec9ce724997a4b3ac8cb4a71002a600514ec1 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 18:33:13 +0800 Subject: [PATCH 3/4] feat: add .pos sync interval/duration --- diskcache/envs.go | 13 +++++ diskcache/get.go | 6 +-- diskcache/get_test.go | 110 +++++++++++++++++++++++++++++++++++++++++- diskcache/open.go | 4 ++ diskcache/options.go | 18 +++++++ diskcache/pos.go | 32 +++++++++++- diskcache/pos_test.go | 40 ++++++++++++--- diskcache/switch.go | 2 +- 8 files changed, 211 insertions(+), 14 deletions(-) diff --git a/diskcache/envs.go b/diskcache/envs.go index e6356e42..a0cab378 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -8,6 +8,7 @@ package diskcache import ( "os" "strconv" + "time" ) func (c *DiskCache) syncEnv() { @@ -37,6 +38,18 @@ func (c *DiskCache) syncEnv() { c.noPos = true } + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { + if du, err := time.ParseDuration(v); err == nil && du > 0 { + c.pos.dumpTick = time.NewTicker(du) + } + } + + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_AT"); ok && v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 { + c.pos.dumpCount = int(n) + } + } + if v, ok := os.LookupEnv("ENV_DISKCACHE_NO_LOCK"); ok && v != "" { c.noLock = true } diff --git a/diskcache/get.go b/diskcache/get.go index 7f406a34..7de2b028 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -179,11 +179,11 @@ __updatePos: // update seek position if !c.noPos && nbytes > 0 { c.pos.Seek += int64(dataHeaderLen + nbytes) - if derr := c.pos.dumpFile(); derr != nil { + if do, derr := c.pos.dumpFile(); derr != nil { return derr + } else if do { + posUpdatedVec.WithLabelValues("get", c.path).Inc() } - - posUpdatedVec.WithLabelValues("get", c.path).Inc() } __end: diff --git a/diskcache/get_test.go b/diskcache/get_test.go index c046f157..55a8d184 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -463,7 +463,10 @@ func TestPutGet(t *T.T) { testData := []byte("0123456789") ndata := 10 - c, err := Open(WithPath(p)) + c, err := Open( + WithPath(p), + WithPosUpdate(0, 0), + ) assert.NoError(t, err) @@ -487,6 +490,7 @@ func TestPutGet(t *T.T) { // reopen the cache c2, err := Open(WithPath(p), + WithPosUpdate(0, 0), WithCapacity(int64(len(testData)*10)), WithBatchSize(int64(len(testData)*2))) require.NoError(t, err, "get error: %s", err) @@ -517,4 +521,108 @@ func TestPutGet(t *T.T) { os.RemoveAll(p) }) }) + + t.Run("pos-sync-at", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(3, 0)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 6; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 6*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(2), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-on-interval", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(10000, time.Duration(time.Millisecond))) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + + time.Sleep(time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) } diff --git a/diskcache/open.go b/diskcache/open.go index c74d2493..05cfce0f 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -74,6 +74,10 @@ func defaultInstance() *DiskCache { pos: &pos{ Seek: 0, Name: nil, + + // dump position each 100ms or 100 update + dumpTick: time.NewTicker(time.Millisecond * 100), + dumpCount: 100, }, } } diff --git a/diskcache/options.go b/diskcache/options.go index fb922f7c..7fb03d36 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -67,6 +67,24 @@ func WithNoPos(on bool) CacheOption { } } +// WithPosUpdate set .pos update intervals. +// +// cnt used to specify how many update on .pos triger a real disk update. +// We can set cnt = 0 to force update .pos on every Get action. +// +// du used to specify how often to triger a real disk update on file .pos. +func WithPosUpdate(cnt int, du time.Duration) CacheOption { + return func(c *DiskCache) { + if cnt >= 0 { + c.pos.dumpCount = cnt + } + + if du > 0 { + c.pos.dumpTick = time.NewTicker(du) + } + } +} + // WithWakeup set duration on wakeup(default 3s), this wakeup time // used to shift current-writing-file to ready-to-reading-file. // diff --git a/diskcache/pos.go b/diskcache/pos.go index a2bf768a..ae5d485c 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -12,12 +12,17 @@ import ( "fmt" "os" "path/filepath" + "time" ) type pos struct { Seek int64 `json:"seek"` Name []byte `json:"name"` + cnt, + dumpCount int // position update count + dumpTick *time.Ticker + fd *os.File fname string // where to dump the binary data buf *bytes.Buffer // reused buffer to build the binary data @@ -32,6 +37,10 @@ func (p *pos) close() error { p.fd = nil } + if p.dumpTick != nil { + p.dumpTick.Stop() + } + return nil } @@ -108,10 +117,10 @@ func (p *pos) reset() error { p.Seek = -1 p.Name = nil - return p.dumpFile() + return p.doDumpFile() } -func (p *pos) dumpFile() error { +func (p *pos) doDumpFile() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) @@ -139,6 +148,25 @@ func (p *pos) dumpFile() error { } } +func (p *pos) dumpFile() (bool, error) { + if p.dumpCount == 0 { // force dump .pos on every Get action. + return true, p.doDumpFile() + } + + p.cnt++ + if p.cnt%p.dumpCount == 0 { + return true, p.doDumpFile() + } + + select { + case <-p.dumpTick.C: + return true, p.doDumpFile() + default: // pass + } + + return false, nil +} + // for benchmark. func (p *pos) dumpJSON() ([]byte, error) { j, err := json.Marshal(p) diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 3ac62f7d..9b4eae79 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -8,6 +8,7 @@ package diskcache import ( "fmt" T "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -16,7 +17,7 @@ func TestDump(t *T.T) { t.Run(`dump-undump`, func(t *T.T) { p := &pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -29,7 +30,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(1024*1024*1024), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -37,7 +38,7 @@ func TestDump(t *T.T) { t.Run(`seek--1`, func(t *T.T) { p := &pos{ Seek: -1, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -48,15 +49,15 @@ func TestDump(t *T.T) { var p2 pos assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(-1), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) t.Run(`seek-0`, func(t *T.T) { p := &pos{ + Name: fmt.Appendf(nil, "data.%032d", 1234), Seek: 0, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), } data, err := p.MarshalBinary() @@ -68,7 +69,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(0), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -77,7 +78,7 @@ func TestDump(t *T.T) { func BenchmarkPosDump(b *T.B) { p := pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } b.Run("binary", func(b *T.B) { @@ -91,4 +92,29 @@ func BenchmarkPosDump(b *T.B) { p.dumpJSON() } }) + + b.Run("force-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 0, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) + + b.Run("interval-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpTick: time.NewTicker(100 * time.Millisecond), + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) } diff --git a/diskcache/switch.go b/diskcache/switch.go index 3bf12347..97e4128c 100644 --- a/diskcache/switch.go +++ b/diskcache/switch.go @@ -93,7 +93,7 @@ func (c *DiskCache) doSwitchNextFile() error { if !c.noPos { c.pos.Name = []byte(c.curReadfile) c.pos.Seek = 0 - if err := c.pos.dumpFile(); err != nil { + if err := c.pos.doDumpFile(); err != nil { return err } From 1c95c7ad65e486120e64482f98f5c0b8939f2482 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 19:46:00 +0800 Subject: [PATCH 4/4] save --- diskcache/envs.go | 2 +- diskcache/get_test.go | 57 +++++++++++++++++++++++++++++++++++++++++-- diskcache/open.go | 4 +-- diskcache/options.go | 4 +-- diskcache/pos.go | 19 +++++++-------- diskcache/pos_test.go | 8 +++--- 6 files changed, 73 insertions(+), 21 deletions(-) diff --git a/diskcache/envs.go b/diskcache/envs.go index a0cab378..a175df30 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -40,7 +40,7 @@ func (c *DiskCache) syncEnv() { if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { if du, err := time.ParseDuration(v); err == nil && du > 0 { - c.pos.dumpTick = time.NewTicker(du) + c.pos.dumpInterval = du } } diff --git a/diskcache/get_test.go b/diskcache/get_test.go index 55a8d184..a62e35f0 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -521,7 +521,9 @@ func TestPutGet(t *T.T) { os.RemoveAll(p) }) }) +} +func TestDelayPosDump(t *T.T) { t.Run("pos-sync-at", func(t *T.T) { ResetMetrics() @@ -577,7 +579,7 @@ func TestPutGet(t *T.T) { ResetMetrics() p := t.TempDir() - c, err := Open(WithPath(p), WithPosUpdate(10000, time.Duration(time.Millisecond))) + c, err := Open(WithPath(p), WithPosUpdate(-1, time.Millisecond*100)) assert.NoError(t, err) testData := []byte("0123456789") @@ -597,7 +599,58 @@ func TestPutGet(t *T.T) { return nil })) - time.Sleep(time.Millisecond) + time.Sleep(100 * time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-force", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(0, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) } assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) diff --git a/diskcache/open.go b/diskcache/open.go index 05cfce0f..dff564fa 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -76,8 +76,8 @@ func defaultInstance() *DiskCache { Name: nil, // dump position each 100ms or 100 update - dumpTick: time.NewTicker(time.Millisecond * 100), - dumpCount: 100, + dumpInterval: time.Millisecond * 100, + dumpCount: 100, }, } } diff --git a/diskcache/options.go b/diskcache/options.go index 7fb03d36..ba7d2c42 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -79,8 +79,8 @@ func WithPosUpdate(cnt int, du time.Duration) CacheOption { c.pos.dumpCount = cnt } - if du > 0 { - c.pos.dumpTick = time.NewTicker(du) + if du >= 0 { + c.pos.dumpInterval = du } } } diff --git a/diskcache/pos.go b/diskcache/pos.go index ae5d485c..e86dd4d4 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -20,8 +20,9 @@ type pos struct { Name []byte `json:"name"` cnt, - dumpCount int // position update count - dumpTick *time.Ticker + dumpCount int + dumpInterval time.Duration + lastDump time.Time fd *os.File fname string // where to dump the binary data @@ -37,10 +38,6 @@ func (p *pos) close() error { p.fd = nil } - if p.dumpTick != nil { - p.dumpTick.Stop() - } - return nil } @@ -155,13 +152,15 @@ func (p *pos) dumpFile() (bool, error) { p.cnt++ if p.cnt%p.dumpCount == 0 { + p.lastDump = time.Now() return true, p.doDumpFile() } - select { - case <-p.dumpTick.C: - return true, p.doDumpFile() - default: // pass + if p.dumpInterval > 0 { + if time.Since(p.lastDump) >= p.dumpInterval { + p.lastDump = time.Now() + return true, p.doDumpFile() + } } return false, nil diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 9b4eae79..74236a57 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -107,10 +107,10 @@ func BenchmarkPosDump(b *T.B) { b.Run("interval-dump", func(b *T.B) { p := pos{ - Seek: 1024 * 1024 * 1024, - Name: fmt.Appendf(nil, "data.%032d", 1234), - dumpCount: 100, - dumpTick: time.NewTicker(100 * time.Millisecond), + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpInterval: 100 * time.Millisecond, } for i := 0; i < b.N; i++ {