diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 181cde2b..9deb0624 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/GuanceCloud/cliutils/logger" ) const ( @@ -57,6 +59,9 @@ var ( // Invalid file header. ErrBadHeader = errors.New("bad header") + + l = logger.DefaultSLogger("diskcache") + once sync.Once ) // DiskCache is the representation of a disk cache. @@ -89,7 +94,8 @@ type DiskCache struct { pos *pos // current read fd position info // specs of current diskcache - size atomic.Int64 // current byte size + size atomic.Int64 // current byte size + curBatchSize, // current writing file's size curReadSize, // current reading file's size batchSize, // current batch size(static) diff --git a/diskcache/envs.go b/diskcache/envs.go index e6356e42..a175df30 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -8,6 +8,7 @@ package diskcache import ( "os" "strconv" + "time" ) func (c *DiskCache) syncEnv() { @@ -37,6 +38,18 @@ func (c *DiskCache) syncEnv() { c.noPos = true } + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { + if du, err := time.ParseDuration(v); err == nil && du > 0 { + c.pos.dumpInterval = du + } + } + + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_AT"); ok && v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 { + c.pos.dumpCount = int(n) + } + } + if v, ok := os.LookupEnv("ENV_DISKCACHE_NO_LOCK"); ok && v != "" { c.noLock = true } diff --git a/diskcache/get.go b/diskcache/get.go index e14ac60e..7de2b028 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -7,6 +7,7 @@ package diskcache import ( "encoding/binary" + "errors" "fmt" "io" "time" @@ -31,6 +32,8 @@ func (c *DiskCache) skipBadFile() error { droppedDataVec.WithLabelValues(c.path, reasonBadDataFile).Observe(float64(c.curReadSize)) }() + l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize) + return c.switchNextFile() } @@ -80,9 +83,10 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error { if err = func() error { c.wlock.Lock() defer c.wlock.Unlock() + return c.rotate() }(); err != nil { - return err + return fmt.Errorf("wakeup error: %w", err) } } @@ -98,6 +102,14 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { + if err != nil && !errors.Is(err, io.EOF) { + l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) + } + + if n > 0 && n != dataHeaderLen { + l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) + } + // On bad datafile, just ignore and delete the file. if err = c.skipBadFile(); err != nil { return err @@ -130,12 +142,15 @@ retry: if len(readbuf) < nbytes { // seek to next read position - if _, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { + if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) - } + } else { + l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", + nbytes, len(readbuf), x, nbytes, c.curReadfile) - droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return ErrTooSmallReadBuf + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) + return ErrTooSmallReadBuf + } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { @@ -164,11 +179,11 @@ __updatePos: // update seek position if !c.noPos && nbytes > 0 { c.pos.Seek += int64(dataHeaderLen + nbytes) - if derr := c.pos.dumpFile(); derr != nil { + if do, derr := c.pos.dumpFile(); derr != nil { return derr + } else if do { + posUpdatedVec.WithLabelValues("get", c.path).Inc() } - - posUpdatedVec.WithLabelValues("get", c.path).Inc() } __end: diff --git a/diskcache/get_test.go b/diskcache/get_test.go index c046f157..a62e35f0 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -463,7 +463,10 @@ func TestPutGet(t *T.T) { testData := []byte("0123456789") ndata := 10 - c, err := Open(WithPath(p)) + c, err := Open( + WithPath(p), + WithPosUpdate(0, 0), + ) assert.NoError(t, err) @@ -487,6 +490,7 @@ func TestPutGet(t *T.T) { // reopen the cache c2, err := Open(WithPath(p), + WithPosUpdate(0, 0), WithCapacity(int64(len(testData)*10)), WithBatchSize(int64(len(testData)*2))) require.NoError(t, err, "get error: %s", err) @@ -518,3 +522,160 @@ func TestPutGet(t *T.T) { }) }) } + +func TestDelayPosDump(t *T.T) { + t.Run("pos-sync-at", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(3, 0)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 6; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 6*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(2), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-on-interval", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(-1, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + + time.Sleep(100 * time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-force", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(0, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) +} diff --git a/diskcache/metric.go b/diskcache/metric.go index 2d7688b6..e13a192b 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -181,7 +181,7 @@ func setupMetrics() { prometheus.GaugeOpts{ Namespace: ns, Name: "size", - Help: "Current cache size that waiting to be consumed(get)", + Help: "Current cache size that waiting to be consumed(get). The size include header bytes", }, []string{"path"}, ) diff --git a/diskcache/open.go b/diskcache/open.go index 06c6fd5c..dff564fa 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -13,10 +13,20 @@ import ( "strconv" "sync" "time" + + "github.com/GuanceCloud/cliutils/logger" ) +func setupLogger() { + once.Do(func() { + l = logger.SLogger("diskcache") + }) +} + // Open init and create a new disk cache. We can set other options with various options. func Open(opts ...CacheOption) (*DiskCache, error) { + setupLogger() + c := defaultInstance() // apply extra options @@ -64,6 +74,10 @@ func defaultInstance() *DiskCache { pos: &pos{ Seek: 0, Name: nil, + + // dump position each 100ms or 100 update + dumpInterval: time.Millisecond * 100, + dumpCount: 100, }, } } @@ -132,10 +146,6 @@ func (c *DiskCache) doOpen() error { switch filepath.Base(path) { case ".lock", ".pos": // ignore them case "data": // not rotated writing file, do not count on sizeVec. - c.size.Add(fi.Size()) - // NOTE: c.size not always equal to sizeVec. c.size used to limit - // total bytes used for Put(), but sizeVec used to count size that - // waiting to be Get(). default: c.size.Add(fi.Size()) sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) @@ -148,6 +158,8 @@ func (c *DiskCache) doOpen() error { } sort.Strings(c.dataFiles) // make file-name sorted for FIFO Get() + l.Infof("on open loaded %d files", len(c.dataFiles)) + datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) // first get, try load .pos if !c.noPos { diff --git a/diskcache/options.go b/diskcache/options.go index fb922f7c..ba7d2c42 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -67,6 +67,24 @@ func WithNoPos(on bool) CacheOption { } } +// WithPosUpdate set .pos update intervals. +// +// cnt used to specify how many update on .pos triger a real disk update. +// We can set cnt = 0 to force update .pos on every Get action. +// +// du used to specify how often to triger a real disk update on file .pos. +func WithPosUpdate(cnt int, du time.Duration) CacheOption { + return func(c *DiskCache) { + if cnt >= 0 { + c.pos.dumpCount = cnt + } + + if du >= 0 { + c.pos.dumpInterval = du + } + } +} + // WithWakeup set duration on wakeup(default 3s), this wakeup time // used to shift current-writing-file to ready-to-reading-file. // diff --git a/diskcache/pos.go b/diskcache/pos.go index a2bf768a..e86dd4d4 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -12,12 +12,18 @@ import ( "fmt" "os" "path/filepath" + "time" ) type pos struct { Seek int64 `json:"seek"` Name []byte `json:"name"` + cnt, + dumpCount int + dumpInterval time.Duration + lastDump time.Time + fd *os.File fname string // where to dump the binary data buf *bytes.Buffer // reused buffer to build the binary data @@ -108,10 +114,10 @@ func (p *pos) reset() error { p.Seek = -1 p.Name = nil - return p.dumpFile() + return p.doDumpFile() } -func (p *pos) dumpFile() error { +func (p *pos) doDumpFile() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) @@ -139,6 +145,27 @@ func (p *pos) dumpFile() error { } } +func (p *pos) dumpFile() (bool, error) { + if p.dumpCount == 0 { // force dump .pos on every Get action. + return true, p.doDumpFile() + } + + p.cnt++ + if p.cnt%p.dumpCount == 0 { + p.lastDump = time.Now() + return true, p.doDumpFile() + } + + if p.dumpInterval > 0 { + if time.Since(p.lastDump) >= p.dumpInterval { + p.lastDump = time.Now() + return true, p.doDumpFile() + } + } + + return false, nil +} + // for benchmark. func (p *pos) dumpJSON() ([]byte, error) { j, err := json.Marshal(p) diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 3ac62f7d..74236a57 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -8,6 +8,7 @@ package diskcache import ( "fmt" T "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -16,7 +17,7 @@ func TestDump(t *T.T) { t.Run(`dump-undump`, func(t *T.T) { p := &pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -29,7 +30,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(1024*1024*1024), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -37,7 +38,7 @@ func TestDump(t *T.T) { t.Run(`seek--1`, func(t *T.T) { p := &pos{ Seek: -1, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -48,15 +49,15 @@ func TestDump(t *T.T) { var p2 pos assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(-1), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) t.Run(`seek-0`, func(t *T.T) { p := &pos{ + Name: fmt.Appendf(nil, "data.%032d", 1234), Seek: 0, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), } data, err := p.MarshalBinary() @@ -68,7 +69,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(0), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -77,7 +78,7 @@ func TestDump(t *T.T) { func BenchmarkPosDump(b *T.B) { p := pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } b.Run("binary", func(b *T.B) { @@ -91,4 +92,29 @@ func BenchmarkPosDump(b *T.B) { p.dumpJSON() } }) + + b.Run("force-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 0, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) + + b.Run("interval-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpInterval: 100 * time.Millisecond, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) } diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 75d1968d..5bce9065 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -73,7 +73,7 @@ func (c *DiskCache) rotate() error { return fmt.Errorf("rotate on Rename(%q, %q): %w", c.curWriteFile, newfile, err) } - // new file added, plus it's size to cache size + // new file added, add it's size to cache size if fi, err := os.Stat(newfile); err == nil { if fi.Size() > dataHeaderLen { c.size.Add(fi.Size()) diff --git a/diskcache/switch.go b/diskcache/switch.go index 3bf12347..97e4128c 100644 --- a/diskcache/switch.go +++ b/diskcache/switch.go @@ -93,7 +93,7 @@ func (c *DiskCache) doSwitchNextFile() error { if !c.noPos { c.pos.Name = []byte(c.curReadfile) c.pos.Seek = 0 - if err := c.pos.dumpFile(); err != nil { + if err := c.pos.doDumpFile(); err != nil { return err }