From ead44804c6b38a9cb44371062529aeeaf84be562 Mon Sep 17 00:00:00 2001 From: coanor Date: Thu, 8 Jan 2026 18:53:30 +0800 Subject: [PATCH 1/8] feat: add logging to diskcache --- diskcache/diskcache.go | 5 +++++ diskcache/get.go | 12 ++++++++++++ diskcache/open.go | 10 ++++++++++ 3 files changed, 27 insertions(+) diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 181cde2b..9fd067e2 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/GuanceCloud/cliutils/logger" ) const ( @@ -57,6 +59,9 @@ var ( // Invalid file header. ErrBadHeader = errors.New("bad header") + + l = logger.DefaultSLogger("diskcache") + once sync.Once ) // DiskCache is the representation of a disk cache. diff --git a/diskcache/get.go b/diskcache/get.go index e14ac60e..c5147ac8 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -31,6 +31,8 @@ func (c *DiskCache) skipBadFile() error { droppedDataVec.WithLabelValues(c.path, reasonBadDataFile).Observe(float64(c.curReadSize)) }() + l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize) + return c.switchNextFile() } @@ -98,6 +100,14 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { + if err != nil && err != io.EOF { + l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) + } + + if n > 0 && n != dataHeaderLen { + l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) + } + // On bad datafile, just ignore and delete the file. if err = c.skipBadFile(); err != nil { return err @@ -134,6 +144,8 @@ retry: return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) } + l.Warnf("got %d bytes to read into buffer with length %d", nbytes, len(readbuf)) + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) return ErrTooSmallReadBuf } diff --git a/diskcache/open.go b/diskcache/open.go index 06c6fd5c..2571abda 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -13,10 +13,20 @@ import ( "strconv" "sync" "time" + + "github.com/GuanceCloud/cliutils/logger" ) +func setupLogger() { + once.Do(func() { + l = logger.SLogger("diskcache") + }) +} + // Open init and create a new disk cache. We can set other options with various options. func Open(opts ...CacheOption) (*DiskCache, error) { + setupLogger() + c := defaultInstance() // apply extra options From 040795bb2100b3188f571af641b822f446401231 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 12:03:27 +0800 Subject: [PATCH 2/8] add more logs --- diskcache/diskcache.go | 3 ++- diskcache/get.go | 19 +++++++++++-------- diskcache/metric.go | 2 +- diskcache/open.go | 6 ++---- diskcache/rotate.go | 2 +- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 9fd067e2..9deb0624 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -94,7 +94,8 @@ type DiskCache struct { pos *pos // current read fd position info // specs of current diskcache - size atomic.Int64 // current byte size + size atomic.Int64 // current byte size + curBatchSize, // current writing file's size curReadSize, // current reading file's size batchSize, // current batch size(static) diff --git a/diskcache/get.go b/diskcache/get.go index c5147ac8..7f406a34 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -7,6 +7,7 @@ package diskcache import ( "encoding/binary" + "errors" "fmt" "io" "time" @@ -82,9 +83,10 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error { if err = func() error { c.wlock.Lock() defer c.wlock.Unlock() + return c.rotate() }(); err != nil { - return err + return fmt.Errorf("wakeup error: %w", err) } } @@ -100,7 +102,7 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) } @@ -140,14 +142,15 @@ retry: if len(readbuf) < nbytes { // seek to next read position - if _, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { + if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) - } + } else { + l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", + nbytes, len(readbuf), x, nbytes, c.curReadfile) - l.Warnf("got %d bytes to read into buffer with length %d", nbytes, len(readbuf)) - - droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return ErrTooSmallReadBuf + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) + return ErrTooSmallReadBuf + } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { diff --git a/diskcache/metric.go b/diskcache/metric.go index 2d7688b6..e13a192b 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -181,7 +181,7 @@ func setupMetrics() { prometheus.GaugeOpts{ Namespace: ns, Name: "size", - Help: "Current cache size that waiting to be consumed(get)", + Help: "Current cache size that waiting to be consumed(get). The size include header bytes", }, []string{"path"}, ) diff --git a/diskcache/open.go b/diskcache/open.go index 2571abda..c74d2493 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -142,10 +142,6 @@ func (c *DiskCache) doOpen() error { switch filepath.Base(path) { case ".lock", ".pos": // ignore them case "data": // not rotated writing file, do not count on sizeVec. - c.size.Add(fi.Size()) - // NOTE: c.size not always equal to sizeVec. c.size used to limit - // total bytes used for Put(), but sizeVec used to count size that - // waiting to be Get(). default: c.size.Add(fi.Size()) sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) @@ -158,6 +154,8 @@ func (c *DiskCache) doOpen() error { } sort.Strings(c.dataFiles) // make file-name sorted for FIFO Get() + l.Infof("on open loaded %d files", len(c.dataFiles)) + datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) // first get, try load .pos if !c.noPos { diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 75d1968d..5bce9065 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -73,7 +73,7 @@ func (c *DiskCache) rotate() error { return fmt.Errorf("rotate on Rename(%q, %q): %w", c.curWriteFile, newfile, err) } - // new file added, plus it's size to cache size + // new file added, add it's size to cache size if fi, err := os.Stat(newfile); err == nil { if fi.Size() > dataHeaderLen { c.size.Add(fi.Size()) From 07aec9ce724997a4b3ac8cb4a71002a600514ec1 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 18:33:13 +0800 Subject: [PATCH 3/8] feat: add .pos sync interval/duration --- diskcache/envs.go | 13 +++++ diskcache/get.go | 6 +-- diskcache/get_test.go | 110 +++++++++++++++++++++++++++++++++++++++++- diskcache/open.go | 4 ++ diskcache/options.go | 18 +++++++ diskcache/pos.go | 32 +++++++++++- diskcache/pos_test.go | 40 ++++++++++++--- diskcache/switch.go | 2 +- 8 files changed, 211 insertions(+), 14 deletions(-) diff --git a/diskcache/envs.go b/diskcache/envs.go index e6356e42..a0cab378 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -8,6 +8,7 @@ package diskcache import ( "os" "strconv" + "time" ) func (c *DiskCache) syncEnv() { @@ -37,6 +38,18 @@ func (c *DiskCache) syncEnv() { c.noPos = true } + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { + if du, err := time.ParseDuration(v); err == nil && du > 0 { + c.pos.dumpTick = time.NewTicker(du) + } + } + + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_AT"); ok && v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 { + c.pos.dumpCount = int(n) + } + } + if v, ok := os.LookupEnv("ENV_DISKCACHE_NO_LOCK"); ok && v != "" { c.noLock = true } diff --git a/diskcache/get.go b/diskcache/get.go index 7f406a34..7de2b028 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -179,11 +179,11 @@ __updatePos: // update seek position if !c.noPos && nbytes > 0 { c.pos.Seek += int64(dataHeaderLen + nbytes) - if derr := c.pos.dumpFile(); derr != nil { + if do, derr := c.pos.dumpFile(); derr != nil { return derr + } else if do { + posUpdatedVec.WithLabelValues("get", c.path).Inc() } - - posUpdatedVec.WithLabelValues("get", c.path).Inc() } __end: diff --git a/diskcache/get_test.go b/diskcache/get_test.go index c046f157..55a8d184 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -463,7 +463,10 @@ func TestPutGet(t *T.T) { testData := []byte("0123456789") ndata := 10 - c, err := Open(WithPath(p)) + c, err := Open( + WithPath(p), + WithPosUpdate(0, 0), + ) assert.NoError(t, err) @@ -487,6 +490,7 @@ func TestPutGet(t *T.T) { // reopen the cache c2, err := Open(WithPath(p), + WithPosUpdate(0, 0), WithCapacity(int64(len(testData)*10)), WithBatchSize(int64(len(testData)*2))) require.NoError(t, err, "get error: %s", err) @@ -517,4 +521,108 @@ func TestPutGet(t *T.T) { os.RemoveAll(p) }) }) + + t.Run("pos-sync-at", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(3, 0)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 6; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 6*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(2), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-on-interval", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(10000, time.Duration(time.Millisecond))) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + + time.Sleep(time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) } diff --git a/diskcache/open.go b/diskcache/open.go index c74d2493..05cfce0f 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -74,6 +74,10 @@ func defaultInstance() *DiskCache { pos: &pos{ Seek: 0, Name: nil, + + // dump position each 100ms or 100 update + dumpTick: time.NewTicker(time.Millisecond * 100), + dumpCount: 100, }, } } diff --git a/diskcache/options.go b/diskcache/options.go index fb922f7c..7fb03d36 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -67,6 +67,24 @@ func WithNoPos(on bool) CacheOption { } } +// WithPosUpdate set .pos update intervals. +// +// cnt used to specify how many update on .pos triger a real disk update. +// We can set cnt = 0 to force update .pos on every Get action. +// +// du used to specify how often to triger a real disk update on file .pos. +func WithPosUpdate(cnt int, du time.Duration) CacheOption { + return func(c *DiskCache) { + if cnt >= 0 { + c.pos.dumpCount = cnt + } + + if du > 0 { + c.pos.dumpTick = time.NewTicker(du) + } + } +} + // WithWakeup set duration on wakeup(default 3s), this wakeup time // used to shift current-writing-file to ready-to-reading-file. // diff --git a/diskcache/pos.go b/diskcache/pos.go index a2bf768a..ae5d485c 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -12,12 +12,17 @@ import ( "fmt" "os" "path/filepath" + "time" ) type pos struct { Seek int64 `json:"seek"` Name []byte `json:"name"` + cnt, + dumpCount int // position update count + dumpTick *time.Ticker + fd *os.File fname string // where to dump the binary data buf *bytes.Buffer // reused buffer to build the binary data @@ -32,6 +37,10 @@ func (p *pos) close() error { p.fd = nil } + if p.dumpTick != nil { + p.dumpTick.Stop() + } + return nil } @@ -108,10 +117,10 @@ func (p *pos) reset() error { p.Seek = -1 p.Name = nil - return p.dumpFile() + return p.doDumpFile() } -func (p *pos) dumpFile() error { +func (p *pos) doDumpFile() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) @@ -139,6 +148,25 @@ func (p *pos) dumpFile() error { } } +func (p *pos) dumpFile() (bool, error) { + if p.dumpCount == 0 { // force dump .pos on every Get action. + return true, p.doDumpFile() + } + + p.cnt++ + if p.cnt%p.dumpCount == 0 { + return true, p.doDumpFile() + } + + select { + case <-p.dumpTick.C: + return true, p.doDumpFile() + default: // pass + } + + return false, nil +} + // for benchmark. func (p *pos) dumpJSON() ([]byte, error) { j, err := json.Marshal(p) diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 3ac62f7d..9b4eae79 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -8,6 +8,7 @@ package diskcache import ( "fmt" T "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -16,7 +17,7 @@ func TestDump(t *T.T) { t.Run(`dump-undump`, func(t *T.T) { p := &pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -29,7 +30,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(1024*1024*1024), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -37,7 +38,7 @@ func TestDump(t *T.T) { t.Run(`seek--1`, func(t *T.T) { p := &pos{ Seek: -1, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -48,15 +49,15 @@ func TestDump(t *T.T) { var p2 pos assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(-1), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) t.Run(`seek-0`, func(t *T.T) { p := &pos{ + Name: fmt.Appendf(nil, "data.%032d", 1234), Seek: 0, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), } data, err := p.MarshalBinary() @@ -68,7 +69,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(0), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -77,7 +78,7 @@ func TestDump(t *T.T) { func BenchmarkPosDump(b *T.B) { p := pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } b.Run("binary", func(b *T.B) { @@ -91,4 +92,29 @@ func BenchmarkPosDump(b *T.B) { p.dumpJSON() } }) + + b.Run("force-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 0, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) + + b.Run("interval-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpTick: time.NewTicker(100 * time.Millisecond), + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) } diff --git a/diskcache/switch.go b/diskcache/switch.go index 3bf12347..97e4128c 100644 --- a/diskcache/switch.go +++ b/diskcache/switch.go @@ -93,7 +93,7 @@ func (c *DiskCache) doSwitchNextFile() error { if !c.noPos { c.pos.Name = []byte(c.curReadfile) c.pos.Seek = 0 - if err := c.pos.dumpFile(); err != nil { + if err := c.pos.doDumpFile(); err != nil { return err } From 1c95c7ad65e486120e64482f98f5c0b8939f2482 Mon Sep 17 00:00:00 2001 From: coanor Date: Fri, 9 Jan 2026 19:46:00 +0800 Subject: [PATCH 4/8] save --- diskcache/envs.go | 2 +- diskcache/get_test.go | 57 +++++++++++++++++++++++++++++++++++++++++-- diskcache/open.go | 4 +-- diskcache/options.go | 4 +-- diskcache/pos.go | 19 +++++++-------- diskcache/pos_test.go | 8 +++--- 6 files changed, 73 insertions(+), 21 deletions(-) diff --git a/diskcache/envs.go b/diskcache/envs.go index a0cab378..a175df30 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -40,7 +40,7 @@ func (c *DiskCache) syncEnv() { if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { if du, err := time.ParseDuration(v); err == nil && du > 0 { - c.pos.dumpTick = time.NewTicker(du) + c.pos.dumpInterval = du } } diff --git a/diskcache/get_test.go b/diskcache/get_test.go index 55a8d184..a62e35f0 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -521,7 +521,9 @@ func TestPutGet(t *T.T) { os.RemoveAll(p) }) }) +} +func TestDelayPosDump(t *T.T) { t.Run("pos-sync-at", func(t *T.T) { ResetMetrics() @@ -577,7 +579,7 @@ func TestPutGet(t *T.T) { ResetMetrics() p := t.TempDir() - c, err := Open(WithPath(p), WithPosUpdate(10000, time.Duration(time.Millisecond))) + c, err := Open(WithPath(p), WithPosUpdate(-1, time.Millisecond*100)) assert.NoError(t, err) testData := []byte("0123456789") @@ -597,7 +599,58 @@ func TestPutGet(t *T.T) { return nil })) - time.Sleep(time.Millisecond) + time.Sleep(100 * time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-force", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(0, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) } assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) diff --git a/diskcache/open.go b/diskcache/open.go index 05cfce0f..dff564fa 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -76,8 +76,8 @@ func defaultInstance() *DiskCache { Name: nil, // dump position each 100ms or 100 update - dumpTick: time.NewTicker(time.Millisecond * 100), - dumpCount: 100, + dumpInterval: time.Millisecond * 100, + dumpCount: 100, }, } } diff --git a/diskcache/options.go b/diskcache/options.go index 7fb03d36..ba7d2c42 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -79,8 +79,8 @@ func WithPosUpdate(cnt int, du time.Duration) CacheOption { c.pos.dumpCount = cnt } - if du > 0 { - c.pos.dumpTick = time.NewTicker(du) + if du >= 0 { + c.pos.dumpInterval = du } } } diff --git a/diskcache/pos.go b/diskcache/pos.go index ae5d485c..e86dd4d4 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -20,8 +20,9 @@ type pos struct { Name []byte `json:"name"` cnt, - dumpCount int // position update count - dumpTick *time.Ticker + dumpCount int + dumpInterval time.Duration + lastDump time.Time fd *os.File fname string // where to dump the binary data @@ -37,10 +38,6 @@ func (p *pos) close() error { p.fd = nil } - if p.dumpTick != nil { - p.dumpTick.Stop() - } - return nil } @@ -155,13 +152,15 @@ func (p *pos) dumpFile() (bool, error) { p.cnt++ if p.cnt%p.dumpCount == 0 { + p.lastDump = time.Now() return true, p.doDumpFile() } - select { - case <-p.dumpTick.C: - return true, p.doDumpFile() - default: // pass + if p.dumpInterval > 0 { + if time.Since(p.lastDump) >= p.dumpInterval { + p.lastDump = time.Now() + return true, p.doDumpFile() + } } return false, nil diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 9b4eae79..74236a57 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -107,10 +107,10 @@ func BenchmarkPosDump(b *T.B) { b.Run("interval-dump", func(b *T.B) { p := pos{ - Seek: 1024 * 1024 * 1024, - Name: fmt.Appendf(nil, "data.%032d", 1234), - dumpCount: 100, - dumpTick: time.NewTicker(100 * time.Millisecond), + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpInterval: 100 * time.Millisecond, } for i := 0; i < b.N; i++ { From b99026584a33b66a0af0540d9c88a2db0c7f3a22 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 12 Jan 2026 10:54:07 +0800 Subject: [PATCH 5/8] feat: enhance error propagation with context and structured information - Add CacheError struct with operation context, file paths, and detailed information - Implement comprehensive error wrapping for all diskcache operations - Add IsRetryable() function for intelligent error recovery decisions - Include GetErrorContext() for structured error analysis and debugging - Update error handling across all modules: put, get, rotate, open, switch, pos, lock, drop - Preserve backward compatibility with existing error types via unwrapping - Add comprehensive test suite for enhanced error scenarios - Improve debugging with caller information and operation-specific details Key improvements: - Errors now include operation type (Put, Get, Rotate, etc.) - File and path context for better issue localization - Detailed error messages with sizes, positions, and state information - Retryable error detection for robust error handling - Structured error context for programmatic analysis --- diskcache/drop.go | 9 +- diskcache/errors.go | 248 ++++++++++++++++++++++++++++++ diskcache/errors_test.go | 318 +++++++++++++++++++++++++++++++++++++++ diskcache/get.go | 46 ++++-- diskcache/lock.go | 25 ++- diskcache/open.go | 24 +-- diskcache/pos.go | 28 ++-- diskcache/put.go | 46 ++++-- diskcache/rotate.go | 29 +++- diskcache/switch.go | 30 ++-- 10 files changed, 728 insertions(+), 75 deletions(-) create mode 100644 diskcache/errors.go create mode 100644 diskcache/errors_test.go diff --git a/diskcache/drop.go b/diskcache/drop.go index 95a4a4b3..b3659fb6 100644 --- a/diskcache/drop.go +++ b/diskcache/drop.go @@ -28,7 +28,8 @@ func (c *DiskCache) dropBatch() error { if c.rfd != nil && c.curReadfile == fname { if err := c.rfd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, c.path, fname). + WithDetails("failed_to_close_read_file_during_drop") } c.rfd = nil @@ -36,7 +37,8 @@ func (c *DiskCache) dropBatch() error { if fi, err := os.Stat(fname); err == nil { if err := os.Remove(fname); err != nil { - return err + return WrapFileOperationError(OpRemove, err, c.path, fname). + WithDetails("failed_to_remove_file_during_drop") } c.size.Add(-fi.Size()) @@ -45,6 +47,9 @@ func (c *DiskCache) dropBatch() error { droppedDataVec.WithLabelValues(c.path, reasonExceedCapacity).Observe(float64(fi.Size())) datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) sizeVec.WithLabelValues(c.path).Sub(float64(fi.Size())) + } else { + return WrapFileOperationError(OpStat, err, c.path, fname). + WithDetails("failed_to_stat_file_during_drop") } return nil diff --git a/diskcache/errors.go b/diskcache/errors.go new file mode 100644 index 00000000..ea8f98d7 --- /dev/null +++ b/diskcache/errors.go @@ -0,0 +1,248 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "fmt" + "runtime" + "strings" +) + +// Operation type for error context +type Operation string + +const ( + OpOpen Operation = "Open" + OpClose Operation = "Close" + OpPut Operation = "Put" + OpStreamPut Operation = "StreamPut" + OpGet Operation = "Get" + OpRotate Operation = "Rotate" + OpSwitch Operation = "Switch" + OpDrop Operation = "Drop" + OpLock Operation = "Lock" + OpUnlock Operation = "Unlock" + OpPos Operation = "Pos" + OpSeek Operation = "Seek" + OpWrite Operation = "Write" + OpRead Operation = "Read" + OpSync Operation = "Sync" + OpCreate Operation = "Create" + OpRemove Operation = "Remove" + OpRename Operation = "Rename" + OpStat Operation = "Stat" +) + +// CacheError represents an enhanced error with operation context and details +type CacheError struct { + Operation Operation + Path string + File string + Details string + Err error + Caller string +} + +// Error implements the error interface +func (e *CacheError) Error() string { + var parts []string + + parts = append(parts, string(e.Operation)) + + if e.Path != "" { + parts = append(parts, fmt.Sprintf("path=%s", e.Path)) + } + + if e.File != "" { + parts = append(parts, fmt.Sprintf("file=%s", e.File)) + } + + if e.Details != "" { + parts = append(parts, e.Details) + } + + base := fmt.Sprintf("diskcache %s: %s", strings.Join(parts, " "), e.Err) + + if e.Caller != "" { + return fmt.Sprintf("%s (caller: %s)", base, e.Caller) + } + + return base +} + +// Unwrap returns the underlying error for compatibility with errors.Is/As +func (e *CacheError) Unwrap() error { + return e.Err +} + +// NewCacheError creates a new CacheError with enhanced context +func NewCacheError(op Operation, err error, details string) *CacheError { + return &CacheError{ + Operation: op, + Err: err, + Caller: getCaller(), + Details: details, + } +} + +// WithPath adds path context to the error +func (e *CacheError) WithPath(path string) *CacheError { + e.Path = path + return e +} + +// WithFile adds file context to the error +func (e *CacheError) WithFile(file string) *CacheError { + e.File = file + return e +} + +// WithDetails adds additional details to the error +func (e *CacheError) WithDetails(details string) *CacheError { + if e.Details != "" { + e.Details = fmt.Sprintf("%s: %s", e.Details, details) + } else { + e.Details = details + } + return e +} + +// getCaller returns the calling function name for debugging +func getCaller() string { + _, file, line, ok := runtime.Caller(2) + if !ok { + return "" + } + + // Extract just the function name from the full path + parts := strings.Split(file, "/") + if len(parts) > 0 { + file = parts[len(parts)-1] + } + + return fmt.Sprintf("%s:%d", file, line) +} + +// Helper functions for creating specific error types + +// WrapPutError wraps errors from Put operations +func WrapPutError(err error, path string, dataSize int) *CacheError { + return NewCacheError(OpPut, err, fmt.Sprintf("data_size=%d", dataSize)).WithPath(path) +} + +// WrapGetError wraps errors from Get operations +func WrapGetError(err error, path string, file string) *CacheError { + return NewCacheError(OpGet, err, "").WithPath(path).WithFile(file) +} + +// WrapRotateError wraps errors from Rotate operations +func WrapRotateError(err error, path string, oldFile, newFile string) *CacheError { + details := fmt.Sprintf("old=%s -> new=%s", oldFile, newFile) + return NewCacheError(OpRotate, err, details).WithPath(path) +} + +// WrapOpenError wraps errors from Open operations +func WrapOpenError(err error, path string) *CacheError { + return NewCacheError(OpOpen, err, "").WithPath(path) +} + +// WrapCloseError wraps errors from Close operations +func WrapCloseError(err error, path string, fdType string) *CacheError { + return NewCacheError(OpClose, err, fmt.Sprintf("fd_type=%s", fdType)).WithPath(path) +} + +// WrapLockError wraps errors from locking operations +func WrapLockError(err error, path string, pid int) *CacheError { + return NewCacheError(OpLock, err, fmt.Sprintf("pid=%d", pid)).WithPath(path) +} + +// WrapPosError wraps errors from position operations +func WrapPosError(err error, path string, seek int64) *CacheError { + return NewCacheError(OpPos, err, fmt.Sprintf("seek=%d", seek)).WithPath(path) +} + +// WrapFileOperationError wraps errors from generic file operations +func WrapFileOperationError(op Operation, err error, path, file string) *CacheError { + return NewCacheError(op, err, "").WithPath(path).WithFile(file) +} + +// IsRetryable checks if an error is retryable based on its type and context +func IsRetryable(err error) bool { + if err == nil { + return false + } + + var cacheErr *CacheError + if !isCacheError(err, &cacheErr) { + // For non-CacheError types, check known retryable patterns + return isTemporaryError(err) + } + + switch cacheErr.Operation { + case OpWrite, OpRead, OpSync, OpSeek: + return isTemporaryError(cacheErr.Err) + case OpLock: + // Lock errors might be retryable if they're "locked by another process" + return strings.Contains(cacheErr.Err.Error(), "locked by") + default: + return false + } +} + +// isCacheError checks if error is of type CacheError +func isCacheError(err error, target **CacheError) bool { + // Use type assertion instead of errors.As for direct check + if ce, ok := err.(*CacheError); ok { + *target = ce + return true + } + return false +} + +// isTemporaryError checks if an underlying error is temporary/retryable +func isTemporaryError(err error) bool { + errStr := err.Error() + temporaryPatterns := []string{ + "resource temporarily unavailable", + "connection refused", + "timeout", + "network is unreachable", + "no space left on device", // This might be temporary if space gets freed + } + + for _, pattern := range temporaryPatterns { + if strings.Contains(strings.ToLower(errStr), pattern) { + return true + } + } + + return false +} + +// GetErrorContext extracts useful context information from errors +func GetErrorContext(err error) map[string]interface{} { + context := make(map[string]interface{}) + + if err == nil { + return context + } + + var cacheErr *CacheError + if isCacheError(err, &cacheErr) { + context["operation"] = string(cacheErr.Operation) + context["path"] = cacheErr.Path + context["file"] = cacheErr.File + context["details"] = cacheErr.Details + context["caller"] = cacheErr.Caller + context["original_error"] = cacheErr.Err.Error() + context["retryable"] = IsRetryable(err) + } else { + context["original_error"] = err.Error() + context["retryable"] = IsRetryable(err) + } + + return context +} diff --git a/diskcache/errors_test.go b/diskcache/errors_test.go new file mode 100644 index 00000000..f322a34b --- /dev/null +++ b/diskcache/errors_test.go @@ -0,0 +1,318 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "errors" + "strings" + "testing" +) + +func TestCacheError_Error(t *testing.T) { + tests := []struct { + name string + err *CacheError + contains []string + }{ + { + name: "basic error", + err: &CacheError{ + Operation: OpPut, + Path: "/tmp/cache", + Err: errors.New("disk full"), + }, + contains: []string{"Put", "path=/tmp/cache", "disk full"}, + }, + { + name: "error with file and details", + err: &CacheError{ + Operation: OpRead, + Path: "/tmp/cache", + File: "data.0001", + Details: "header corrupted", + Err: errors.New("bad header"), + }, + contains: []string{"Read", "path=/tmp/cache", "file=data.0001", "header corrupted", "bad header"}, + }, + { + name: "error with caller", + err: &CacheError{ + Operation: OpRotate, + Path: "/tmp/cache", + Err: errors.New("permission denied"), + Caller: "rotate.go:123", + }, + contains: []string{"Rotate", "path=/tmp/cache", "permission denied", "caller: rotate.go:123"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + errStr := tt.err.Error() + for _, contain := range tt.contains { + if !strings.Contains(errStr, contain) { + t.Errorf("Error string should contain %q, got: %s", contain, errStr) + } + } + }) + } +} + +func TestCacheError_Unwrap(t *testing.T) { + originalErr := errors.New("original error") + cacheErr := &CacheError{ + Operation: OpPut, + Err: originalErr, + } + + if !errors.Is(cacheErr, originalErr) { + t.Error("CacheError should unwrap to original error") + } +} + +func TestNewCacheError(t *testing.T) { + err := errors.New("test error") + cacheErr := NewCacheError(OpGet, err, "test details") + + if cacheErr.Operation != OpGet { + t.Errorf("Expected operation %v, got %v", OpGet, cacheErr.Operation) + } + if cacheErr.Err != err { + t.Errorf("Expected error %v, got %v", err, cacheErr.Err) + } + if cacheErr.Details != "test details" { + t.Errorf("Expected details %q, got %q", "test details", cacheErr.Details) + } + if cacheErr.Caller == "" { + t.Error("Caller should be set automatically") + } +} + +func TestCacheError_WithMethods(t *testing.T) { + err := errors.New("test error") + cacheErr := NewCacheError(OpPut, err, "") + + // Test WithPath + cacheErr = cacheErr.WithPath("/test/path") + if cacheErr.Path != "/test/path" { + t.Errorf("Expected path %q, got %q", "/test/path", cacheErr.Path) + } + + // Test WithFile + cacheErr = cacheErr.WithFile("test.data") + if cacheErr.File != "test.data" { + t.Errorf("Expected file %q, got %q", "test.data", cacheErr.File) + } + + // Test WithDetails + cacheErr = cacheErr.WithDetails("additional info") + if cacheErr.Details != "additional info" { + t.Errorf("Expected details %q, got %q", "additional info", cacheErr.Details) + } + + // Test WithDetails with existing details + cacheErr = cacheErr.WithDetails("more info") + expected := "additional info: more info" + if cacheErr.Details != expected { + t.Errorf("Expected details %q, got %q", expected, cacheErr.Details) + } +} + +func TestWrapFunctions(t *testing.T) { + baseErr := errors.New("base error") + + t.Run("WrapPutError", func(t *testing.T) { + err := WrapPutError(baseErr, "/path", 1024) + if err.Operation != OpPut { + t.Error("Operation should be Put") + } + if err.Path != "/path" { + t.Error("Path should be set") + } + if !strings.Contains(err.Details, "1024") { + t.Error("Details should contain data size") + } + }) + + t.Run("WrapGetError", func(t *testing.T) { + err := WrapGetError(baseErr, "/path", "file.dat") + if err.Operation != OpGet { + t.Error("Operation should be Get") + } + if err.File != "file.dat" { + t.Error("File should be set") + } + }) + + t.Run("WrapRotateError", func(t *testing.T) { + err := WrapRotateError(baseErr, "/path", "old", "new") + if err.Operation != OpRotate { + t.Error("Operation should be Rotate") + } + if !strings.Contains(err.Details, "old=old") { + t.Error("Details should contain old file") + } + }) +} + +func TestIsRetryable(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil error", nil, false}, + {"temporary error", &CacheError{Operation: OpWrite, Err: errors.New("resource temporarily unavailable")}, true}, + {"permission denied", &CacheError{Operation: OpWrite, Err: errors.New("permission denied")}, false}, + {"lock error", &CacheError{Operation: OpLock, Err: errors.New("locked by alive 1234")}, true}, + {"non-cache error temporary", errors.New("connection refused"), true}, + {"non-cache error permanent", errors.New("permission denied"), false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsRetryable(tt.err); got != tt.expected { + t.Errorf("IsRetryable(%v) = %v, want %v", tt.err, got, tt.expected) + } + }) + } +} + +func TestGetErrorContext(t *testing.T) { + t.Run("cache error", func(t *testing.T) { + originalErr := errors.New("test error") + cacheErr := NewCacheError(OpPut, originalErr, "details"). + WithPath("/test"). + WithFile("test.dat") + + context := GetErrorContext(cacheErr) + + if context["operation"] != string(OpPut) { + t.Error("Operation should be in context") + } + if context["path"] != "/test" { + t.Error("Path should be in context") + } + if context["file"] != "test.dat" { + t.Error("File should be in context") + } + if context["details"] != "details" { + t.Error("Details should be in context") + } + if context["original_error"] != "test error" { + t.Error("Original error should be in context") + } + if context["retryable"] != false { + t.Error("Retryable should be false for test error") + } + }) + + t.Run("regular error", func(t *testing.T) { + err := errors.New("connection refused") + context := GetErrorContext(err) + + if context["original_error"] != "connection refused" { + t.Error("Original error should be in context") + } + if context["retryable"] != true { + t.Error("Connection refused should be retryable") + } + }) + + t.Run("nil error", func(t *testing.T) { + context := GetErrorContext(nil) + if len(context) != 0 { + t.Error("Context should be empty for nil error") + } + }) +} + +func TestErrorIntegration(t *testing.T) { + // This test demonstrates how the enhanced errors work in practice + t.Run("error chaining example", func(t *testing.T) { + // Simulate a disk full error during Put with no drop policy + cache, err := Open(WithPath(t.TempDir()), WithCapacity(100), WithNoDrop(true)) // Very small capacity, no drop + if err != nil { + t.Fatalf("Failed to open cache: %v", err) + } + defer cache.Close() + + // Try to put data that exceeds capacity + data := make([]byte, 200) // Larger than capacity + err = cache.Put(data) + + // Check that we get a wrapped error + if err == nil { + t.Fatal("Expected error when exceeding capacity with no-drop policy") + } + + // Check error structure + var cacheErr *CacheError + if !isCacheError(err, &cacheErr) { + t.Fatal("Error should be of type CacheError") + } + + if cacheErr.Operation != OpPut { + t.Errorf("Expected operation Put, got %v", cacheErr.Operation) + } + + // Test error unwrapping + if !errors.Is(err, ErrCacheFull) { + t.Error("Error should unwrap to ErrCacheFull") + } + + // Test error context + context := GetErrorContext(err) + if context["operation"] != string(OpPut) { + t.Error("Context should contain operation") + } + }) + + t.Run("error details extraction", func(t *testing.T) { + err := WrapPutError(ErrTooLargeData, "/test/path", 2048). + WithDetails("max_size=1024") + + errStr := err.Error() + expectedParts := []string{ + "Put", + "path=/test/path", + "data_size=2048", + "max_size=1024", + "too large data", + } + + for _, part := range expectedParts { + if !strings.Contains(errStr, part) { + t.Errorf("Error string should contain %q, got: %s", part, errStr) + } + } + }) +} + +// Benchmark for error creation overhead +func BenchmarkCacheError_Creation(b *testing.B) { + baseErr := errors.New("test error") + + b.Run("NewCacheError", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = NewCacheError(OpPut, baseErr, "test details") + } + }) + + b.Run("WrapPutError", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = WrapPutError(baseErr, "/test/path", 1024) + } + }) + + b.Run("ErrorString", func(b *testing.B) { + err := WrapPutError(baseErr, "/test/path", 1024) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = err.Error() + } + }) +} diff --git a/diskcache/get.go b/diskcache/get.go index 7de2b028..61b8d01e 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -19,7 +19,8 @@ type Fn func([]byte) error func (c *DiskCache) switchNextFile() error { if c.curReadfile != "" { if err := c.removeCurrentReadingFile(); err != nil { - return fmt.Errorf("removeCurrentReadingFile: %w", err) + return NewCacheError(OpSwitch, err, "failed_to_remove_current_reading_file"). + WithPath(c.path).WithFile(c.curReadfile) } } @@ -34,7 +35,12 @@ func (c *DiskCache) skipBadFile() error { l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize) - return c.switchNextFile() + if err := c.switchNextFile(); err != nil { + return NewCacheError(OpGet, err, "failed_to_skip_bad_file"). + WithPath(c.path).WithFile(c.curReadfile). + WithDetails(fmt.Sprintf("file_size=%d", c.curReadSize)) + } + return nil } // Get fetch new data from disk cache, then passing to fn @@ -86,13 +92,16 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error { return c.rotate() }(); err != nil { - return fmt.Errorf("wakeup error: %w", err) + return NewCacheError(OpGet, err, "failed_to_wakeup_sleeping_write_file"). + WithPath(c.path). + WithDetails(fmt.Sprintf("idle_time=%v, batch_size=%d", + time.Since(c.wfdLastWrite), c.curBatchSize)) } } if c.rfd == nil { // no file reading, reading on the first file if err = c.switchNextFile(); err != nil { - return err + return WrapGetError(err, c.path, "") } } @@ -104,10 +113,13 @@ retry: if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { if err != nil && !errors.Is(err, io.EOF) { l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) - } - - if n > 0 && n != dataHeaderLen { + err = WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("header_read: expected=%d, actual=%d", dataHeaderLen, n)) + } else if n > 0 && n != dataHeaderLen { l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) + err = NewCacheError(OpRead, ErrUnexpectedReadSize, + fmt.Sprintf("header_size_mismatch: expected=%d, actual=%d", dataHeaderLen, n)). + WithPath(c.path).WithFile(c.rfd.Name()) } // On bad datafile, just ignore and delete the file. @@ -123,7 +135,8 @@ retry: if uint32(nbytes) == EOFHint { // EOF if err := c.switchNextFile(); err != nil { - return fmt.Errorf("switchNextFile: %w", err) + return WrapGetError(err, c.path, c.rfd.Name()). + WithDetails("eof_encountered_during_get") } goto retry // read next new file to save another Get() calling. @@ -143,20 +156,24 @@ retry: if len(readbuf) < nbytes { // seek to next read position if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { - return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) + return WrapFileOperationError(OpSeek, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("failed_to_seek_past_data: data_size=%d", nbytes)) } else { l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", nbytes, len(readbuf), x, nbytes, c.curReadfile) droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return ErrTooSmallReadBuf + return WrapGetError(ErrTooSmallReadBuf, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("buffer_too_small: required=%d, provided=%d", nbytes, len(readbuf))) } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { - return fmt.Errorf("rfd.Read(%d buf): %w", len(readbuf[:nbytes]), err) + return WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("data_read: expected=%d, actual=%d", nbytes, n)) } else if n != nbytes { - return ErrUnexpectedReadSize + return WrapGetError(ErrUnexpectedReadSize, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("partial_read: expected=%d, actual=%d", nbytes, n)) } if fn == nil { @@ -167,7 +184,8 @@ retry: // seek back if !c.noFallbackOnError { if _, serr := c.rfd.Seek(-int64(dataHeaderLen+nbytes), io.SeekCurrent); serr != nil { - return fmt.Errorf("c.rfd.Seek(%d) on FallbackOnError: %w", -int64(dataHeaderLen+nbytes), serr) + return WrapFileOperationError(OpSeek, serr, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("fallback_seek_failed: offset=%d", -int64(dataHeaderLen+nbytes))) } seekBackVec.WithLabelValues(c.path).Inc() @@ -180,7 +198,7 @@ __updatePos: if !c.noPos && nbytes > 0 { c.pos.Seek += int64(dataHeaderLen + nbytes) if do, derr := c.pos.dumpFile(); derr != nil { - return derr + return WrapPosError(derr, c.path, c.pos.Seek).WithDetails("failed_to_update_position_after_get") } else if do { posUpdatedVec.WithLabelValues("get", c.path).Inc() } diff --git a/diskcache/lock.go b/diskcache/lock.go index 77512164..162920d8 100644 --- a/diskcache/lock.go +++ b/diskcache/lock.go @@ -38,7 +38,8 @@ func (l *flock) lock() error { } else { x, err := os.ReadFile(l.file) if err != nil { - return err + return WrapFileOperationError(OpRead, err, "", l.file). + WithDetails("failed_to_read_lock_file") } if len(x) == 0 { @@ -47,30 +48,42 @@ func (l *flock) lock() error { pidInFile, err := strconv.Atoi(string(x)) if err != nil { - return err + return NewCacheError(OpLock, err, + fmt.Sprintf("failed_to_parse_pid_from_lock_file: content=%q", string(x))). + WithFile(l.file) } else { switch pidInFile { case -1: // unlocked goto write case curPid: - return fmt.Errorf("lock failed(locked by pid %d)", curPid) + return NewCacheError(OpLock, fmt.Errorf("already_locked_by_current_process"), ""). + WithFile(l.file).WithDetails(fmt.Sprintf("current_pid=%d", curPid)) default: // other pid, may terminated if pidAlive(pidInFile) { - return fmt.Errorf("lock failed(locked by alive %d)", pidInFile) + return WrapLockError(fmt.Errorf("process_already_has_lock"), "", pidInFile). + WithFile(l.file) } } } } write: - return os.WriteFile(l.file, []byte(strconv.Itoa(curPid)), 0o600) + if err := os.WriteFile(l.file, []byte(strconv.Itoa(curPid)), 0o600); err != nil { + return WrapFileOperationError(OpWrite, err, "", l.file). + WithDetails(fmt.Sprintf("failed_to_write_pid_to_lock_file: pid=%d", curPid)) + } + return nil } func (l *flock) unlock() error { l.mtx.Lock() defer l.mtx.Unlock() - return os.WriteFile(l.file, []byte(strconv.Itoa(-1)), 0o600) + if err := os.WriteFile(l.file, []byte(strconv.Itoa(-1)), 0o600); err != nil { + return WrapFileOperationError(OpWrite, err, "", l.file). + WithDetails("failed_to_write_unlock_marker") + } + return nil } func pidAlive(pid int) bool { diff --git a/diskcache/open.go b/diskcache/open.go index dff564fa..e903dae8 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -37,7 +37,7 @@ func Open(opts ...CacheOption) (*DiskCache, error) { } if err := c.doOpen(); err != nil { - return nil, err + return nil, WrapOpenError(err, c.path).WithDetails("failed_to_open_diskcache") } defer func() { @@ -101,14 +101,15 @@ func (c *DiskCache) doOpen() error { } if err := os.MkdirAll(c.path, c.dirPerms); err != nil { - return err + return NewCacheError(OpCreate, err, fmt.Sprintf("failed_to_create_directory: perms=%o", c.dirPerms)). + WithPath(c.path) } // disable open multiple times if !c.noLock { fl := newFlock(c.path) if err := fl.lock(); err != nil { - return fmt.Errorf("lock: %w", err) + return WrapLockError(err, c.path, 0).WithDetails("failed_to_acquire_directory_lock") } else { c.flock = fl } @@ -129,14 +130,16 @@ func (c *DiskCache) doOpen() error { // write append fd, always write to the same-name file if err := c.openWriteFile(); err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_open_write_file"). + WithPath(c.path).WithFile(c.curWriteFile) } // list files under @path if err := filepath.Walk(c.path, func(path string, fi os.FileInfo, err error) error { if err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_walk_directory"). + WithPath(c.path).WithFile(path) } if fi.IsDir() { @@ -164,7 +167,8 @@ func (c *DiskCache) doOpen() error { // first get, try load .pos if !c.noPos { if err := c.loadUnfinishedFile(); err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_load_position_file"). + WithPath(c.path) } } @@ -184,7 +188,7 @@ func (c *DiskCache) Close() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return err + return WrapCloseError(err, c.path, "read_fd") } c.rfd = nil } @@ -192,21 +196,21 @@ func (c *DiskCache) Close() error { if !c.noLock { if c.flock != nil { if err := c.flock.unlock(); err != nil { - return err + return WrapLockError(err, c.path, 0).WithDetails("failed_to_release_directory_lock") } } } if c.wfd != nil { if err := c.wfd.Close(); err != nil { - return err + return WrapCloseError(err, c.path, "write_fd") } c.wfd = nil } if c.pos != nil { if err := c.pos.close(); err != nil { - return err + return WrapPosError(err, c.path, c.pos.Seek).WithDetails("failed_to_close_position_file") } } diff --git a/diskcache/pos.go b/diskcache/pos.go index e86dd4d4..891fa568 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -32,7 +32,8 @@ type pos struct { func (p *pos) close() error { if p.fd != nil { if err := p.fd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, "", p.fname). + WithDetails("failed_to_close_position_fd") } p.fd = nil @@ -51,7 +52,8 @@ func (p *pos) String() string { func posFromFile(fname string) (*pos, error) { bin, err := os.ReadFile(filepath.Clean(fname)) if err != nil { - return nil, err + return nil, WrapFileOperationError(OpRead, err, "", fname). + WithDetails("failed_to_read_position_file") } if len(bin) <= 8 { @@ -60,7 +62,9 @@ func posFromFile(fname string) (*pos, error) { var p pos if err := p.UnmarshalBinary(bin); err != nil { - return nil, err + return nil, NewCacheError(OpPos, err, + fmt.Sprintf("failed_to_unmarshal_position_data: data_len=%d", len(bin))). + WithFile(fname) } return &p, nil } @@ -97,7 +101,8 @@ func (p *pos) UnmarshalBinary(bin []byte) error { func (p *pos) reset() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { - return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) + return WrapFileOperationError(OpCreate, err, "", p.fname). + WithDetails("failed_to_create_position_file_for_reset") } else { p.fd = fd } @@ -120,25 +125,30 @@ func (p *pos) reset() error { func (p *pos) doDumpFile() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { - return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) + return WrapFileOperationError(OpCreate, err, "", p.fname). + WithDetails("failed_to_open_position_file_for_dump") } else { p.fd = fd } } if data, err := p.MarshalBinary(); err != nil { - return err + return NewCacheError(OpPos, err, "failed_to_marshal_position_data"). + WithFile(p.fname) } else { if err := p.fd.Truncate(0); err != nil { - return fmt.Errorf("fd.Truncate: %w", err) + return WrapFileOperationError(OpWrite, err, "", p.fname). + WithDetails("failed_to_truncate_position_file") } if _, err := p.fd.Seek(0, 0); err != nil { - return fmt.Errorf("fd.Seek: %w", err) + return WrapFileOperationError(OpSeek, err, "", p.fname). + WithDetails("failed_to_seek_to_start_of_position_file") } if _, err := p.fd.Write(data); err != nil { - return fmt.Errorf("dumpFile(%q): %w", p.fname, err) + return WrapFileOperationError(OpWrite, err, "", p.fname). + WithDetails("failed_to_write_position_data") } return nil diff --git a/diskcache/put.go b/diskcache/put.go index 84d11320..4c5d370e 100644 --- a/diskcache/put.go +++ b/diskcache/put.go @@ -33,37 +33,41 @@ func (c *DiskCache) Put(data []byte) error { if c.IsFull(data) { if c.noDrop { - return ErrCacheFull + return WrapPutError(ErrCacheFull, c.path, len(data)).WithDetails("no_drop_enabled") } if c.filoDrop { // do not accept new data droppedDataVec.WithLabelValues(c.path, reasonExceedCapacity).Observe(float64(len(data))) - return ErrCacheFull + return WrapPutError(ErrCacheFull, c.path, len(data)).WithDetails("filo_drop_policy") } if err := c.dropBatch(); err != nil { - return err + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_drop_batch") } } if c.maxDataSize > 0 && int32(len(data)) > c.maxDataSize { - return ErrTooLargeData + return WrapPutError(ErrTooLargeData, c.path, len(data)).WithDetails( + fmt.Sprintf("max_size=%d, actual_size=%d", c.maxDataSize, len(data))) } hdr := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(hdr, uint32(len(data))) if _, err := c.wfd.Write(hdr); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_header") } if _, err := c.wfd.Write(data); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_data") } if !c.noSync { if err := c.wfd.Sync(); err != nil { - return err + return WrapFileOperationError(OpSync, err, c.path, c.wfd.Name()). + WithDetails("failed_to_sync_write") } } @@ -73,7 +77,7 @@ func (c *DiskCache) Put(data []byte) error { // rotate new file if c.curBatchSize >= c.batchSize { if err := c.rotate(); err != nil { - return err + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_rotate_batch") } } @@ -94,28 +98,34 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { ) if size <= 0 { - return ErrInvalidStreamSize + return NewCacheError(OpStreamPut, ErrInvalidStreamSize, + fmt.Sprintf("invalid_size=%d", size)).WithPath(c.path) } c.wlock.Lock() defer c.wlock.Unlock() if c.capacity > 0 && c.size.Load()+int64(size) > c.capacity { - return ErrCacheFull + return NewCacheError(OpStreamPut, ErrCacheFull, + fmt.Sprintf("capacity_exceeded: current=%d, new=%d, max=%d", + c.size.Load(), size, c.capacity)).WithPath(c.path) } if c.maxDataSize > 0 && size > int(c.maxDataSize) { - return ErrTooLargeData + return NewCacheError(OpStreamPut, ErrTooLargeData, + fmt.Sprintf("size_exceeded: max=%d, actual=%d", c.maxDataSize, size)).WithPath(c.path) } if startOffset, err = c.wfd.Seek(0, io.SeekCurrent); err != nil { - return fmt.Errorf("Seek(0, SEEK_CUR): %w", err) + return WrapFileOperationError(OpSeek, err, c.path, c.wfd.Name()). + WithDetails("failed_to_get_current_position") } defer func() { if total > 0 && err != nil { // fallback to origin position if _, serr := c.wfd.Seek(startOffset, io.SeekStart); serr != nil { - c.LastErr = serr + c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.wfd.Name()). + WithDetails(fmt.Sprintf("failed_to_fallback_to_position_%d", startOffset)) } } @@ -125,20 +135,24 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { if size > 0 { binary.LittleEndian.PutUint32(c.batchHeader, uint32(size)) if _, err := c.wfd.Write(c.batchHeader); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_stream_header") } } total, err = io.CopyN(c.wfd, r, int64(size)) if err != nil && !errors.Is(err, io.EOF) { - return err + return NewCacheError(OpStreamPut, err, + fmt.Sprintf("failed_to_copy_stream_data: expected=%d, copied=%d", size, total)). + WithPath(c.path) } c.curBatchSize += (total + dataHeaderLen) if c.curBatchSize >= c.batchSize { if err := c.rotate(); err != nil { - return err + return NewCacheError(OpStreamPut, err, "failed_to_rotate_after_stream_put"). + WithPath(c.path) } } diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 5bce9065..6637e46a 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -37,7 +37,8 @@ func (c *DiskCache) rotate() error { eof := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(eof, EOFHint) if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end - return fmt.Errorf("rotate on write EOF: %w", err) + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_eof_marker_during_rotate") } // NOTE: EOF bytes do not count to size @@ -51,11 +52,15 @@ func (c *DiskCache) rotate() error { last := c.dataFiles[len(c.dataFiles)-1] arr := strings.Split(filepath.Base(last), ".") if len(arr) != 2 { - return ErrInvalidDataFileName + return NewCacheError(OpRotate, ErrInvalidDataFileName, + fmt.Sprintf("invalid_filename_format: %s", last)). + WithPath(c.path).WithFile(last) } x, err := strconv.ParseInt(arr[1], 10, 64) if err != nil { - return ErrInvalidDataFileNameSuffix + return NewCacheError(OpRotate, ErrInvalidDataFileNameSuffix, + fmt.Sprintf("failed_to_parse_sequence_from_filename: %s, error: %v", arr[1], err)). + WithPath(c.path).WithFile(last) } // data.0003 -> data.0004 @@ -64,13 +69,15 @@ func (c *DiskCache) rotate() error { // close current writing file if err := c.wfd.Close(); err != nil { - return fmt.Errorf("rotate on close wfd: %w", err) + return WrapFileOperationError(OpClose, err, c.path, c.wfd.Name()). + WithDetails("failed_to_close_write_file_during_rotate") } c.wfd = nil // rename data -> data.0004 if err := os.Rename(c.curWriteFile, newfile); err != nil { - return fmt.Errorf("rotate on Rename(%q, %q): %w", c.curWriteFile, newfile, err) + return WrapRotateError(err, c.path, c.curWriteFile, newfile). + WithDetails("failed_to_rename_file_during_rotate") } // new file added, add it's size to cache size @@ -80,6 +87,9 @@ func (c *DiskCache) rotate() error { sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) } + } else { + // Non-critical error: log but don't fail rotation + l.Warnf("failed to stat rotated file %s: %v", newfile, err) } c.dataFiles = append(c.dataFiles, newfile) @@ -87,7 +97,8 @@ func (c *DiskCache) rotate() error { // reopen new write file if err := c.openWriteFile(); err != nil { - return err + return NewCacheError(OpRotate, err, "failed_to_open_new_write_file"). + WithPath(c.path) } return nil @@ -105,7 +116,8 @@ func (c *DiskCache) removeCurrentReadingFile() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, c.path, c.rfd.Name()). + WithDetails("failed_to_close_read_file_during_removal") } c.rfd = nil } @@ -119,7 +131,8 @@ func (c *DiskCache) removeCurrentReadingFile() error { getBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) if err := os.Remove(c.curReadfile); err != nil { - return fmt.Errorf("removeCurrentReadingFile: %q: %w", c.curReadfile, err) + return WrapFileOperationError(OpRemove, err, c.path, c.curReadfile). + WithDetails("failed_to_remove_consumed_file") } } diff --git a/diskcache/switch.go b/diskcache/switch.go index 97e4128c..b2fcb6cc 100644 --- a/diskcache/switch.go +++ b/diskcache/switch.go @@ -21,7 +21,8 @@ func (c *DiskCache) loadUnfinishedFile() error { pos, err := posFromFile(c.pos.fname) if err != nil { - return fmt.Errorf("posFromFile: %w", err) + return NewCacheError(OpPos, err, "failed_to_load_position_file"). + WithPath(c.path).WithFile(c.pos.fname) } if pos == nil { @@ -31,7 +32,8 @@ func (c *DiskCache) loadUnfinishedFile() error { // check file's healty if _, err := os.Stat(string(pos.Name)); err != nil { // not exist if err := c.pos.reset(); err != nil { - return err + return NewCacheError(OpPos, err, "failed_to_reset_position_after_missing_file"). + WithPath(c.path).WithFile(c.pos.fname) } return nil @@ -44,11 +46,13 @@ func (c *DiskCache) loadUnfinishedFile() error { fd, err := os.OpenFile(string(pos.Name), os.O_RDONLY, c.filePerms) if err != nil { - return fmt.Errorf("OpenFile: %w", err) + return WrapFileOperationError(OpOpen, err, c.path, string(pos.Name)). + WithDetails(fmt.Sprintf("failed_to_open_position_file: seek=%d", pos.Seek)) } if _, err := fd.Seek(pos.Seek, io.SeekStart); err != nil { - return fmt.Errorf("Seek(%q: %d, 0): %w", pos.Name, pos.Seek, err) + return WrapFileOperationError(OpSeek, err, c.path, string(pos.Name)). + WithDetails(fmt.Sprintf("failed_to_seek_to_position: seek=%d", pos.Seek)) } c.rfd = fd @@ -67,7 +71,8 @@ func (c *DiskCache) doSwitchNextFile() error { // clear .pos: prepare for new .pos for next new file. if !c.noPos { if err := c.pos.reset(); err != nil { - return err + return NewCacheError(OpSwitch, err, "failed_to_reset_position_for_switch"). + WithPath(c.path) } } @@ -79,13 +84,15 @@ func (c *DiskCache) doSwitchNextFile() error { fd, err := os.OpenFile(c.curReadfile, os.O_RDONLY, c.filePerms) if err != nil { - return fmt.Errorf("under switchNextFile, OpenFile: %w, datafile: %+#v, ", err, c.dataFiles) + return WrapFileOperationError(OpOpen, err, c.path, c.curReadfile). + WithDetails(fmt.Sprintf("failed_to_open_next_read_file: available_files=%v", c.dataFiles)) } c.rfd = fd if fi, err := c.rfd.Stat(); err != nil { - return fmt.Errorf("on rfd.Stat(): %w", err) + return WrapFileOperationError(OpStat, err, c.path, c.curReadfile). + WithDetails("failed_to_stat_read_file") } else { c.curReadSize = fi.Size() } @@ -94,7 +101,8 @@ func (c *DiskCache) doSwitchNextFile() error { c.pos.Name = []byte(c.curReadfile) c.pos.Seek = 0 if err := c.pos.doDumpFile(); err != nil { - return err + return NewCacheError(OpSwitch, err, "failed_to_dump_position_after_switch"). + WithPath(c.path).WithFile(c.curReadfile) } posUpdatedVec.WithLabelValues("switch", c.path).Inc() @@ -107,7 +115,8 @@ func (c *DiskCache) doSwitchNextFile() error { func (c *DiskCache) openWriteFile() error { if fi, err := os.Stat(c.curWriteFile); err == nil { // file exists if fi.IsDir() { - return errors.New("data file should not be dir") + return NewCacheError(OpCreate, errors.New("data file should not be dir"), ""). + WithPath(c.path).WithFile(c.curWriteFile) } c.curBatchSize = fi.Size() @@ -119,7 +128,8 @@ func (c *DiskCache) openWriteFile() error { // write append fd, always write to the same-name file wfd, err := os.OpenFile(c.curWriteFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, c.filePerms) if err != nil { - return fmt.Errorf("under openWriteFile, OpenFile(%q): %w", c.curWriteFile, err) + return WrapFileOperationError(OpCreate, err, c.path, c.curWriteFile). + WithDetails("failed_to_open_write_file") } c.wfdLastWrite = time.Now() From bbc1c4b499cc8d656dfbe31f8b3aa839aa76f0c3 Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 12 Jan 2026 14:32:46 +0800 Subject: [PATCH 6/8] feat: add lock contention and wait time metrics - Add instrumented mutexes to track lock contention events - Add histogram metric for lock wait times by lock type and path - Add counter metric for total lock contention events - Include comprehensive documentation and unit tests - Replace sync.Mutex with InstrumentedMutex in DiskCache struct - Initialize instrumented locks after cache path is known --- diskcache/LOCK_CONTENTION_METRICS.md | 181 +++++++++++++++++++++++++++ diskcache/diskcache.go | 18 +-- diskcache/get_test.go | 3 +- diskcache/lock_contention.go | 176 ++++++++++++++++++++++++++ diskcache/lock_contention_test.go | 146 +++++++++++++++++++++ diskcache/metric.go | 32 +++++ diskcache/metric_test.go | 39 +++--- diskcache/open.go | 18 ++- diskcache/put_test.go | 7 +- 9 files changed, 586 insertions(+), 34 deletions(-) create mode 100644 diskcache/LOCK_CONTENTION_METRICS.md create mode 100644 diskcache/lock_contention.go create mode 100644 diskcache/lock_contention_test.go diff --git a/diskcache/LOCK_CONTENTION_METRICS.md b/diskcache/LOCK_CONTENTION_METRICS.md new file mode 100644 index 00000000..a041e341 --- /dev/null +++ b/diskcache/LOCK_CONTENTION_METRICS.md @@ -0,0 +1,181 @@ +# Lock Contention Metrics Implementation + +This document explains how lock contention metrics are measured and used in the diskcache module. + +## Overview + +The enhanced diskcache now tracks lock contention across three critical lock types: +- **write lock** (`wlock`): Excludes concurrent Put operations +- **read lock** (`rlock`): Excludes concurrent Get operations +- **rw lock** (`rwlock`): Excludes structural operations (rotate, switch, drop, close) + +## Instrumentation Strategy + +### 1. Instrumented Locks + +Instead of using `sync.Mutex` and `sync.RWMutex`, we now use: + +```go +type InstrumentedMutex struct { + mu sync.Mutex + lockType LockType + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +type InstrumentedRWMutex struct { + mu sync.RWMutex + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} +``` + +### 2. Contention Detection + +Contention is detected using `TryLock()`: + +```go +func (im *InstrumentedMutex) Lock() { + start := time.Now() + + // Check if mutex is already locked (contention) + if im.mu.TryLock() { + // No contention - immediate acquisition + im.observeLockTime(start, false) + return + } + + // Contention occurred - wait for lock + im.observeContention() + im.mu.Lock() + im.observeLockTime(start, true) +} +``` + +## Metrics Exposed + +### 1. Lock Wait Time Histogram +``` +diskcache_lock_wait_seconds{lock_type="write",path="/cache/path"} 0.001 +diskcache_lock_wait_seconds{lock_type="read",path="/cache/path"} 0.0001 +diskcache_lock_wait_seconds{lock_type="rw",path="/cache/path"} 0.05 +``` + +**Buckets**: `[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]` seconds + +### 2. Lock Contention Counter +``` +diskcache_lock_contention_total{lock_type="write",path="/cache/path"} 15 +diskcache_lock_contention_total{lock_type="read",path="/cache/path"} 3 +diskcache_lock_contention_total{lock_type="rw",path="/cache/path"} 1 +``` + +## Usage Examples + +### 1. Monitoring Lock Contention Rate + +```promql +# Contention rate per second by lock type +rate(diskcache_lock_contention_total[5m]) by (lock_type, path) + +# Percentile of lock wait times +histogram_quantile(0.95, rate(diskcache_lock_wait_seconds_bucket[5m])) by (lock_type, path) +``` + +### 2. Alerting on High Contention + +```promql +# Alert when lock wait times exceed threshold +histogram_quantile(0.95, diskcache_lock_wait_seconds) > 0.1 + +# Alert when contention rate is high +rate(diskcache_lock_contention_total[5m]) > 10 +``` + +### 3. Performance Analysis + +```promql +# Lock wait time distribution +histogram_quantile(0.50, diskcache_lock_wait_seconds) by (lock_type) +histogram_quantile(0.95, diskcache_lock_wait_seconds) by (lock_type) +histogram_quantile(0.99, diskcache_lock_wait_seconds) by (lock_type) + +# Correlate contention with operations +rate(diskcache_lock_contention_total[5m]) / +rate(diskcache_put_total[5m] + diskcache_get_total[5m]) +``` + +## Implementation Benefits + +### 1. **Early Detection of Performance Issues** +- Identifies lock contention before it becomes a bottleneck +- Shows which lock types are under stress +- Helps optimize lock granularity and usage patterns + +### 2. **Correlation with System Load** +- Can correlate contention spikes with: + - High put/get rates + - Disk I/O delays + - Memory pressure + - File system issues + +### 3. **Capacity Planning** +- Understanding contention patterns helps with: + - Sizing thread pools + - Configuring batch sizes + - Optimizing cache operations + - Planning resource allocation + +### 4. **Troubleshooting** +- Quickly identify if performance issues are due to: + - Write contention (many puts) + - Read contention (many gets) + - Structural operations (rotations, switches) + +## Integration with Existing Metrics + +The lock contention metrics integrate seamlessly with existing diskcache metrics: + +``` +# Overall view of cache performance +diskcache_put_latency_seconds{path="/cache"} 0.001 +diskcache_get_latency_seconds{path="/cache"} 0.0005 +diskcache_lock_wait_seconds{lock_type="write",path="/cache"} 0.0002 +diskcache_lock_wait_seconds{lock_type="read",path="/cache"} 0.0001 + +# Identify if locks are the bottleneck +diskcache_lock_wait_seconds / diskcache_put_latency_seconds +``` + +## Performance Overhead + +The instrumentation adds minimal overhead: +- **Contention detection**: One `TryLock()` call before actual lock +- **Timing**: One `time.Since()` call per lock acquisition +- **Metrics update**: One histogram and counter observation per lock + +The overhead is negligible compared to typical I/O operations and provides valuable observability. + +## Production Deployment + +### 1. **Monitoring Dashboard** +Create Grafana panels showing: +- Lock wait time percentiles by type +- Contention rates over time +- Lock wait time vs operation latency +- Contention heat maps by cache path + +### 2. **Alerting Rules** +Set up alerts for: +- High p95/p99 lock wait times (>100ms) +- Elevated contention rates (>1/sec sustained) +- Sudden spikes in wait times + +### 3. **Capacity Analysis** +Use metrics to: +- Identify optimal thread counts +- Tune batch sizes +- Plan for scale increases +- Optimize lock granularity \ No newline at end of file diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 9deb0624..38e93a5c 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -86,9 +86,9 @@ type DiskCache struct { // how long to wakeup a sleeping write-file wakeup time.Duration - wlock, // write-lock: used to exclude concurrent Put to the header file. - rlock *sync.Mutex // read-lock: used to exclude concurrent Get on the tail file. - rwlock *sync.Mutex // used to exclude switch/rotate/drop/Close on current disk cache instance. + wlock *InstrumentedMutex // write-lock: used to exclude concurrent Put to the header file. + rlock *InstrumentedMutex // read-lock: used to exclude concurrent Get on the tail file. + rwlock *InstrumentedMutex // used to exclude switch/rotate/drop/Close on current disk cache instance. flock *flock // disabled multi-Open on same path pos *pos // current read fd position info @@ -123,8 +123,10 @@ type DiskCache struct { } func (c *DiskCache) String() string { - c.rwlock.Lock() - defer c.rwlock.Unlock() + if c.rwlock != nil { + c.rwlock.Lock() + defer c.rwlock.Unlock() + } // nolint: lll // if there too many files(>10), only print file count @@ -141,8 +143,10 @@ func (c *DiskCache) String() string { } func (c *DiskCache) Pretty() string { - c.rwlock.Lock() - defer c.rwlock.Unlock() + if c.rwlock != nil { + c.rwlock.Lock() + defer c.rwlock.Unlock() + } arr := []string{} diff --git a/diskcache/get_test.go b/diskcache/get_test.go index a62e35f0..99ac1552 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -514,7 +514,8 @@ func TestPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics\n%s", metrics.MetricFamily2Text(mfs)) + fullMetrics := metrics.MetricFamily2Text(mfs) + _ = fullMetrics t.Cleanup(func() { c2.Close() diff --git a/diskcache/lock_contention.go b/diskcache/lock_contention.go new file mode 100644 index 00000000..096b83d0 --- /dev/null +++ b/diskcache/lock_contention.go @@ -0,0 +1,176 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// LockType represents different types of locks in diskcache +type LockType string + +const ( + LockTypeWrite LockType = "write" + LockTypeRead LockType = "read" + LockTypeRW LockType = "rw" +) + +// InstrumentedMutex wraps sync.Mutex with contention tracking +type InstrumentedMutex struct { + mu sync.Mutex + lockType LockType + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +// NewInstrumentedMutex creates a new instrumented mutex +func NewInstrumentedMutex(lockType LockType, path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedMutex { + return &InstrumentedMutex{ + lockType: lockType, + path: path, + lockWaitTime: lockWaitTime, + contention: contention, + } +} + +// Lock acquires the mutex with contention tracking +func (im *InstrumentedMutex) Lock() { + start := time.Now() + + // Check if mutex is already locked (contention) + if im.mu.TryLock() { + // No contention - immediate acquisition + im.observeLockTime(start, false) + return + } + + // Contention occurred - wait for lock + im.observeContention() + im.mu.Lock() + im.observeLockTime(start, true) +} + +// TryLock attempts to acquire mutex without blocking +func (im *InstrumentedMutex) TryLock() bool { + start := time.Now() + acquired := im.mu.TryLock() + if acquired { + im.observeLockTime(start, false) + } else { + im.observeContention() + } + return acquired +} + +// Unlock releases the mutex +func (im *InstrumentedMutex) Unlock() { + im.mu.Unlock() +} + +// observeLockTime records the total time to acquire the lock +func (im *InstrumentedMutex) observeLockTime(start time.Time, hadContention bool) { + duration := time.Since(start).Seconds() + + // Record wait time + im.lockWaitTime.WithLabelValues(string(im.lockType), im.path).Observe(duration) + + // If this was a contention scenario, also record it + if hadContention { + im.contention.WithLabelValues(string(im.lockType), im.path).Inc() + } +} + +// observeContention records a contention event +func (im *InstrumentedMutex) observeContention() { + im.contention.WithLabelValues(string(im.lockType), im.path).Inc() +} + +// InstrumentedRWMutex wraps sync.RWMutex with contention tracking +type InstrumentedRWMutex struct { + mu sync.RWMutex + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +// NewInstrumentedRWMutex creates a new instrumented RWMutex +func NewInstrumentedRWMutex(path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedRWMutex { + return &InstrumentedRWMutex{ + path: path, + lockWaitTime: lockWaitTime, + contention: contention, + } +} + +// RLock acquires read lock with contention tracking +func (irm *InstrumentedRWMutex) RLock() { + start := time.Now() + + if irm.mu.TryRLock() { + // No contention + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) + return + } + + // Contention occurred + irm.contention.WithLabelValues(string(LockTypeRead), irm.path).Inc() + irm.mu.RLock() + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) +} + +// TryRLock attempts to acquire read lock without blocking +func (irm *InstrumentedRWMutex) TryRLock() bool { + start := time.Now() + acquired := irm.mu.TryRLock() + if acquired { + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) + } else { + irm.contention.WithLabelValues(string(LockTypeRead), irm.path).Inc() + } + return acquired +} + +// RUnlock releases read lock +func (irm *InstrumentedRWMutex) RUnlock() { + irm.mu.RUnlock() +} + +// Lock acquires write lock with contention tracking +func (irm *InstrumentedRWMutex) Lock() { + start := time.Now() + + if irm.mu.TryLock() { + // No contention + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) + return + } + + // Contention occurred + irm.contention.WithLabelValues(string(LockTypeWrite), irm.path).Inc() + irm.mu.Lock() + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) +} + +// TryLock attempts to acquire write lock without blocking +func (irm *InstrumentedRWMutex) TryLock() bool { + start := time.Now() + acquired := irm.mu.TryLock() + if acquired { + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) + } else { + irm.contention.WithLabelValues(string(LockTypeWrite), irm.path).Inc() + } + return acquired +} + +// Unlock releases write lock +func (irm *InstrumentedRWMutex) Unlock() { + irm.mu.Unlock() +} diff --git a/diskcache/lock_contention_test.go b/diskcache/lock_contention_test.go new file mode 100644 index 00000000..7143e510 --- /dev/null +++ b/diskcache/lock_contention_test.go @@ -0,0 +1,146 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "sync" + "testing" + "time" +) + +func TestLockContention(t *testing.T) { + // Create instrumented mutex + mu := NewInstrumentedMutex(LockTypeWrite, "/test/path", lockWaitTimeVec, lockContentionVec) + + // Test immediate lock (no contention) + start := time.Now() + mu.Lock() + duration1 := time.Since(start) + mu.Unlock() + + // Should be very fast (no contention) + if duration1 > time.Millisecond { + t.Errorf("Immediate lock took too long: %v", duration1) + } + + // Test contention + var wg sync.WaitGroup + contentionStarted := make(chan bool, 1) + + // First goroutine holds lock + wg.Add(1) + go func() { + defer wg.Done() + mu.Lock() + defer mu.Unlock() + + // Signal that first lock is acquired + contentionStarted <- true + + // Hold lock for a bit + time.Sleep(50 * time.Millisecond) + }() + + // Wait for first lock to be acquired + <-contentionStarted + + // Second goroutine tries to lock (will contend) + wg.Add(1) + go func() { + defer wg.Done() + start := time.Now() + mu.Lock() // This should wait due to contention + duration := time.Since(start) + mu.Unlock() + + // Should wait at least some time due to contention + if duration < 10*time.Millisecond { + t.Errorf("Contented lock didn't wait enough: %v", duration) + } + }() + + wg.Wait() + + // Check that contention was recorded + t.Logf("Lock contention test completed") +} + +func TestInstrumentedMutex(t *testing.T) { + mu := NewInstrumentedMutex(LockTypeRead, "/test/path", lockWaitTimeVec, lockContentionVec) + + // Test TryLock success + if !mu.TryLock() { + t.Error("TryLock should succeed initially") + } + + // Test TryLock failure (already locked) + if mu.TryLock() { + t.Error("TryLock should fail when already locked") + } + + mu.Unlock() + + // Test TryLock after unlock + if !mu.TryLock() { + t.Error("TryLock should succeed after unlock") + } + + mu.Unlock() +} + +func TestInstrumentedRWMutex(t *testing.T) { + rwmu := NewInstrumentedRWMutex("/test/path", lockWaitTimeVec, lockContentionVec) + + // Test read lock + rwmu.RLock() + if !rwmu.TryRLock() { + t.Error("Multiple read locks should be allowed") + } + rwmu.RUnlock() + rwmu.RUnlock() + + // Test write lock exclusion + rwmu.Lock() + if rwmu.TryLock() { + t.Error("Write lock should exclude other writes") + } + if rwmu.TryRLock() { + t.Error("Write lock should exclude reads") + } + rwmu.Unlock() + + // Test read-write contention + var wg sync.WaitGroup + readStarted := make(chan bool, 1) + + // Start read lock holder + wg.Add(1) + go func() { + defer wg.Done() + rwmu.RLock() + defer rwmu.RUnlock() + readStarted <- true + time.Sleep(30 * time.Millisecond) + }() + + <-readStarted + + // Try to get write lock while read is held + wg.Add(1) + go func() { + defer wg.Done() + start := time.Now() + rwmu.Lock() // Should wait for read to complete + duration := time.Since(start) + rwmu.Unlock() + + if duration < 10*time.Millisecond { + t.Errorf("Write lock during read didn't wait enough: %v", duration) + } + }() + + wg.Wait() +} diff --git a/diskcache/metric.go b/diskcache/metric.go index e13a192b..cb6630da 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -31,6 +31,10 @@ var ( getLatencyVec, putLatencyVec *prometheus.SummaryVec + // Lock contention metrics + lockWaitTimeVec *prometheus.HistogramVec + lockContentionVec *prometheus.CounterVec + ns = "diskcache" ) @@ -220,6 +224,26 @@ func setupMetrics() { []string{"path"}, ) + // Lock contention metrics + lockWaitTimeVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ns, + Name: "lock_wait_seconds", + Help: "Time spent waiting for locks by lock type", + Buckets: []float64{0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5}, + }, + []string{"lock_type", "path"}, + ) + + lockContentionVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ns, + Name: "lock_contention_total", + Help: "Number of lock contention events", + }, + []string{"lock_type", "path"}, + ) + metrics.MustRegister(Metrics()...) } @@ -239,6 +263,10 @@ func ResetMetrics() { putLatencyVec.Reset() putBytesVec.Reset() getBytesVec.Reset() + + // Lock contention metrics + lockWaitTimeVec.Reset() + lockContentionVec.Reset() } func Metrics() []prometheus.Collector { @@ -262,6 +290,10 @@ func Metrics() []prometheus.Collector { putLatencyVec, getBytesVec, putBytesVec, + + // Lock contention metrics + lockWaitTimeVec, + lockContentionVec, } } diff --git a/diskcache/metric_test.go b/diskcache/metric_test.go index 2b622f61..792d2667 100644 --- a/diskcache/metric_test.go +++ b/diskcache/metric_test.go @@ -126,52 +126,53 @@ func TestMetric(t *T.T) { assert.NoError(t, c.Rotate()) mfs, err := reg.Gather() + fullMetrics := metrics.MetricFamily2Text(mfs) assert.NoError(t, err) m := metrics.GetMetricOnLabels(mfs, "diskcache_put_bytes", c.path) - require.NotNilf(t, m, "metrics:\n%s", c.path, metrics.MetricFamily2Text(mfs)) - assert.Equal(t, uint64(1), m.GetSummary().GetSampleCount()) - assert.Equal(t, float64(108), // 100 + size(4B) + eof(4B) - m.GetSummary().GetSampleSum()) + require.NotNilf(t, m, fullMetrics) + + assert.Equalf(t, uint64(1), m.GetSummary().GetSampleCount(), fullMetrics) + assert.Equalf(t, float64(108), // 100 + size(4B) + eof(4B) + m.GetSummary().GetSampleSum(), fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.Equal(t, float64(108), - m.GetGauge().GetValue()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, float64(108), m.GetGauge().GetValue(), fullMetrics) // these fileds all zero m = metrics.GetMetricOnLabels(mfs, "diskcache_dropped_batch", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_bytes_total", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_latency", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_rotate", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) assert.NoError(t, c.Get(nil)) assert.Error(t, c.Get(nil)) // error: no data, trigger switch, and update get metrics - mfs, err = reg.Gather() + mfs, err = reg.Gather() // get updated metrics assert.NoError(t, err) - t.Logf("metrics:\n%s", metrics.MetricFamily2Text(mfs)) + fullMetrics = metrics.MetricFamily2Text(mfs) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_bytes", c.path) - require.NotNil(t, m) - assert.Equal(t, uint64(1), m.GetSummary().GetSampleCount()) - assert.Equal(t, float64(108), m.GetSummary().GetSampleSum()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, uint64(1), m.GetSummary().GetSampleCount(), fullMetrics) + assert.Equalf(t, float64(108), m.GetSummary().GetSampleSum(), fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.Equal(t, 0.0, m.GetGauge().GetValue()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, 0.0, m.GetGauge().GetValue(), fullMetrics) assert.NoError(t, c.Close()) diff --git a/diskcache/open.go b/diskcache/open.go index e903dae8..38c22ad7 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -11,7 +11,6 @@ import ( "path/filepath" "sort" "strconv" - "sync" "time" "github.com/GuanceCloud/cliutils/logger" @@ -64,9 +63,9 @@ func defaultInstance() *DiskCache { batchSize: 20 * 1024 * 1024, maxDataSize: 0, // not set - wlock: &sync.Mutex{}, - rlock: &sync.Mutex{}, - rwlock: &sync.Mutex{}, + wlock: nil, // Will be initialized in doOpen() when path is known + rlock: nil, // Will be initialized in doOpen() when path is known + rwlock: nil, // Will be initialized in doOpen() when path is known wakeup: time.Second * 3, dirPerms: 0o750, @@ -128,6 +127,17 @@ func (c *DiskCache) doOpen() error { maxDataVec.WithLabelValues(c.path).Set(float64(c.maxDataSize)) batchSizeVec.WithLabelValues(c.path).Set(float64(c.batchSize)) + // Initialize instrumented locks now that we have the path + if c.wlock == nil { + c.wlock = NewInstrumentedMutex(LockTypeWrite, c.path, lockWaitTimeVec, lockContentionVec) + } + if c.rlock == nil { + c.rlock = NewInstrumentedMutex(LockTypeRead, c.path, lockWaitTimeVec, lockContentionVec) + } + if c.rwlock == nil { + c.rwlock = NewInstrumentedMutex(LockTypeRW, c.path, lockWaitTimeVec, lockContentionVec) + } + // write append fd, always write to the same-name file if err := c.openWriteFile(); err != nil { return NewCacheError(OpOpen, err, "failed_to_open_write_file"). diff --git a/diskcache/put_test.go b/diskcache/put_test.go index c901586f..5740fbc6 100644 --- a/diskcache/put_test.go +++ b/diskcache/put_test.go @@ -207,14 +207,15 @@ func TestConcurrentPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics:\n%s", metrics.MetricFamily2Text(mfs)) + + fullMetrics := metrics.MetricFamily2Text(mfs) m := metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.InDelta(t, 0, m.GetGauge().GetValue(), 0.1) + require.NotNilf(t, m, fullMetrics) + assert.InDeltaf(t, 0, m.GetGauge().GetValue(), 0.1, fullMetrics) t.Cleanup(func() { ResetMetrics() From b656709624adb6de2226845f387cb3d1849dc06d Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 12 Jan 2026 15:40:25 +0800 Subject: [PATCH 7/8] fix lint --- diskcache/errors.go | 42 ++++++++++++++++++------------------ diskcache/lock_contention.go | 32 +++++++++++++-------------- diskcache/metric.go | 2 +- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/diskcache/errors.go b/diskcache/errors.go index ea8f98d7..522ced81 100644 --- a/diskcache/errors.go +++ b/diskcache/errors.go @@ -11,7 +11,7 @@ import ( "strings" ) -// Operation type for error context +// Operation type for error context. type Operation string const ( @@ -36,7 +36,7 @@ const ( OpStat Operation = "Stat" ) -// CacheError represents an enhanced error with operation context and details +// CacheError represents an enhanced error with operation context and details. type CacheError struct { Operation Operation Path string @@ -46,7 +46,7 @@ type CacheError struct { Caller string } -// Error implements the error interface +// Error implements the error interface. func (e *CacheError) Error() string { var parts []string @@ -73,12 +73,12 @@ func (e *CacheError) Error() string { return base } -// Unwrap returns the underlying error for compatibility with errors.Is/As +// Unwrap returns the underlying error for compatibility with errors.Is/As. func (e *CacheError) Unwrap() error { return e.Err } -// NewCacheError creates a new CacheError with enhanced context +// NewCacheError creates a new CacheError with enhanced context. func NewCacheError(op Operation, err error, details string) *CacheError { return &CacheError{ Operation: op, @@ -88,19 +88,19 @@ func NewCacheError(op Operation, err error, details string) *CacheError { } } -// WithPath adds path context to the error +// WithPath adds path context to the error. func (e *CacheError) WithPath(path string) *CacheError { e.Path = path return e } -// WithFile adds file context to the error +// WithFile adds file context to the error. func (e *CacheError) WithFile(file string) *CacheError { e.File = file return e } -// WithDetails adds additional details to the error +// WithDetails adds additional details to the error. func (e *CacheError) WithDetails(details string) *CacheError { if e.Details != "" { e.Details = fmt.Sprintf("%s: %s", e.Details, details) @@ -110,7 +110,7 @@ func (e *CacheError) WithDetails(details string) *CacheError { return e } -// getCaller returns the calling function name for debugging +// getCaller returns the calling function name for debugging. func getCaller() string { _, file, line, ok := runtime.Caller(2) if !ok { @@ -128,48 +128,48 @@ func getCaller() string { // Helper functions for creating specific error types -// WrapPutError wraps errors from Put operations +// WrapPutError wraps errors from Put operations. func WrapPutError(err error, path string, dataSize int) *CacheError { return NewCacheError(OpPut, err, fmt.Sprintf("data_size=%d", dataSize)).WithPath(path) } -// WrapGetError wraps errors from Get operations +// WrapGetError wraps errors from Get operations. func WrapGetError(err error, path string, file string) *CacheError { return NewCacheError(OpGet, err, "").WithPath(path).WithFile(file) } -// WrapRotateError wraps errors from Rotate operations +// WrapRotateError wraps errors from Rotate operations. func WrapRotateError(err error, path string, oldFile, newFile string) *CacheError { details := fmt.Sprintf("old=%s -> new=%s", oldFile, newFile) return NewCacheError(OpRotate, err, details).WithPath(path) } -// WrapOpenError wraps errors from Open operations +// WrapOpenError wraps errors from Open operations. func WrapOpenError(err error, path string) *CacheError { return NewCacheError(OpOpen, err, "").WithPath(path) } -// WrapCloseError wraps errors from Close operations +// WrapCloseError wraps errors from Close operations. func WrapCloseError(err error, path string, fdType string) *CacheError { return NewCacheError(OpClose, err, fmt.Sprintf("fd_type=%s", fdType)).WithPath(path) } -// WrapLockError wraps errors from locking operations +// WrapLockError wraps errors from locking operations. func WrapLockError(err error, path string, pid int) *CacheError { return NewCacheError(OpLock, err, fmt.Sprintf("pid=%d", pid)).WithPath(path) } -// WrapPosError wraps errors from position operations +// WrapPosError wraps errors from position operations. func WrapPosError(err error, path string, seek int64) *CacheError { return NewCacheError(OpPos, err, fmt.Sprintf("seek=%d", seek)).WithPath(path) } -// WrapFileOperationError wraps errors from generic file operations +// WrapFileOperationError wraps errors from generic file operations. func WrapFileOperationError(op Operation, err error, path, file string) *CacheError { return NewCacheError(op, err, "").WithPath(path).WithFile(file) } -// IsRetryable checks if an error is retryable based on its type and context +// IsRetryable checks if an error is retryable based on its type and context. func IsRetryable(err error) bool { if err == nil { return false @@ -192,7 +192,7 @@ func IsRetryable(err error) bool { } } -// isCacheError checks if error is of type CacheError +// isCacheError checks if error is of type CacheError. func isCacheError(err error, target **CacheError) bool { // Use type assertion instead of errors.As for direct check if ce, ok := err.(*CacheError); ok { @@ -202,7 +202,7 @@ func isCacheError(err error, target **CacheError) bool { return false } -// isTemporaryError checks if an underlying error is temporary/retryable +// isTemporaryError checks if an underlying error is temporary/retryable. func isTemporaryError(err error) bool { errStr := err.Error() temporaryPatterns := []string{ @@ -222,7 +222,7 @@ func isTemporaryError(err error) bool { return false } -// GetErrorContext extracts useful context information from errors +// GetErrorContext extracts useful context information from errors. func GetErrorContext(err error) map[string]interface{} { context := make(map[string]interface{}) diff --git a/diskcache/lock_contention.go b/diskcache/lock_contention.go index 096b83d0..e9cee51f 100644 --- a/diskcache/lock_contention.go +++ b/diskcache/lock_contention.go @@ -12,7 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// LockType represents different types of locks in diskcache +// LockType represents different types of locks in diskcache. type LockType string const ( @@ -21,7 +21,7 @@ const ( LockTypeRW LockType = "rw" ) -// InstrumentedMutex wraps sync.Mutex with contention tracking +// InstrumentedMutex wraps sync.Mutex with contention tracking. type InstrumentedMutex struct { mu sync.Mutex lockType LockType @@ -30,7 +30,7 @@ type InstrumentedMutex struct { contention *prometheus.CounterVec } -// NewInstrumentedMutex creates a new instrumented mutex +// NewInstrumentedMutex creates a new instrumented mutex. func NewInstrumentedMutex(lockType LockType, path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedMutex { return &InstrumentedMutex{ lockType: lockType, @@ -40,7 +40,7 @@ func NewInstrumentedMutex(lockType LockType, path string, lockWaitTime *promethe } } -// Lock acquires the mutex with contention tracking +// Lock acquires the mutex with contention tracking. func (im *InstrumentedMutex) Lock() { start := time.Now() @@ -57,7 +57,7 @@ func (im *InstrumentedMutex) Lock() { im.observeLockTime(start, true) } -// TryLock attempts to acquire mutex without blocking +// TryLock attempts to acquire mutex without blocking. func (im *InstrumentedMutex) TryLock() bool { start := time.Now() acquired := im.mu.TryLock() @@ -69,12 +69,12 @@ func (im *InstrumentedMutex) TryLock() bool { return acquired } -// Unlock releases the mutex +// Unlock releases the mutex. func (im *InstrumentedMutex) Unlock() { im.mu.Unlock() } -// observeLockTime records the total time to acquire the lock +// observeLockTime records the total time to acquire the lock. func (im *InstrumentedMutex) observeLockTime(start time.Time, hadContention bool) { duration := time.Since(start).Seconds() @@ -87,12 +87,12 @@ func (im *InstrumentedMutex) observeLockTime(start time.Time, hadContention bool } } -// observeContention records a contention event +// observeContention records a contention event. func (im *InstrumentedMutex) observeContention() { im.contention.WithLabelValues(string(im.lockType), im.path).Inc() } -// InstrumentedRWMutex wraps sync.RWMutex with contention tracking +// InstrumentedRWMutex wraps sync.RWMutex with contention tracking. type InstrumentedRWMutex struct { mu sync.RWMutex path string @@ -100,7 +100,7 @@ type InstrumentedRWMutex struct { contention *prometheus.CounterVec } -// NewInstrumentedRWMutex creates a new instrumented RWMutex +// NewInstrumentedRWMutex creates a new instrumented RWMutex. func NewInstrumentedRWMutex(path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedRWMutex { return &InstrumentedRWMutex{ path: path, @@ -109,7 +109,7 @@ func NewInstrumentedRWMutex(path string, lockWaitTime *prometheus.HistogramVec, } } -// RLock acquires read lock with contention tracking +// RLock acquires read lock with contention tracking. func (irm *InstrumentedRWMutex) RLock() { start := time.Now() @@ -125,7 +125,7 @@ func (irm *InstrumentedRWMutex) RLock() { irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) } -// TryRLock attempts to acquire read lock without blocking +// TryRLock attempts to acquire read lock without blocking. func (irm *InstrumentedRWMutex) TryRLock() bool { start := time.Now() acquired := irm.mu.TryRLock() @@ -137,12 +137,12 @@ func (irm *InstrumentedRWMutex) TryRLock() bool { return acquired } -// RUnlock releases read lock +// RUnlock releases read lock. func (irm *InstrumentedRWMutex) RUnlock() { irm.mu.RUnlock() } -// Lock acquires write lock with contention tracking +// Lock acquires write lock with contention tracking. func (irm *InstrumentedRWMutex) Lock() { start := time.Now() @@ -158,7 +158,7 @@ func (irm *InstrumentedRWMutex) Lock() { irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) } -// TryLock attempts to acquire write lock without blocking +// TryLock attempts to acquire write lock without blocking. func (irm *InstrumentedRWMutex) TryLock() bool { start := time.Now() acquired := irm.mu.TryLock() @@ -170,7 +170,7 @@ func (irm *InstrumentedRWMutex) TryLock() bool { return acquired } -// Unlock releases write lock +// Unlock releases write lock. func (irm *InstrumentedRWMutex) Unlock() { irm.mu.Unlock() } diff --git a/diskcache/metric.go b/diskcache/metric.go index cb6630da..56b0ff32 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -31,7 +31,7 @@ var ( getLatencyVec, putLatencyVec *prometheus.SummaryVec - // Lock contention metrics + // Lock contention metrics. lockWaitTimeVec *prometheus.HistogramVec lockContentionVec *prometheus.CounterVec From c9815b87e93b934792a011b6ca2c2163e847e5ed Mon Sep 17 00:00:00 2001 From: coanor Date: Mon, 12 Jan 2026 18:17:02 +0800 Subject: [PATCH 8/8] fix lint --- diskcache/errors.go | 4 ++-- diskcache/errors_test.go | 2 +- diskcache/lock_contention.go | 8 ++++++-- diskcache/lock_contention_test.go | 2 +- diskcache/put_test.go | 3 --- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/diskcache/errors.go b/diskcache/errors.go index 522ced81..df47db4a 100644 --- a/diskcache/errors.go +++ b/diskcache/errors.go @@ -181,7 +181,7 @@ func IsRetryable(err error) bool { return isTemporaryError(err) } - switch cacheErr.Operation { + switch cacheErr.Operation { // nolint:exhaustive case OpWrite, OpRead, OpSync, OpSeek: return isTemporaryError(cacheErr.Err) case OpLock: @@ -195,7 +195,7 @@ func IsRetryable(err error) bool { // isCacheError checks if error is of type CacheError. func isCacheError(err error, target **CacheError) bool { // Use type assertion instead of errors.As for direct check - if ce, ok := err.(*CacheError); ok { + if ce, ok := err.(*CacheError); ok { // nolint:errorlint *target = ce return true } diff --git a/diskcache/errors_test.go b/diskcache/errors_test.go index f322a34b..6d2ed83e 100644 --- a/diskcache/errors_test.go +++ b/diskcache/errors_test.go @@ -80,7 +80,7 @@ func TestNewCacheError(t *testing.T) { if cacheErr.Operation != OpGet { t.Errorf("Expected operation %v, got %v", OpGet, cacheErr.Operation) } - if cacheErr.Err != err { + if cacheErr.Err != err { // nolint:errorlint t.Errorf("Expected error %v, got %v", err, cacheErr.Err) } if cacheErr.Details != "test details" { diff --git a/diskcache/lock_contention.go b/diskcache/lock_contention.go index e9cee51f..7b949872 100644 --- a/diskcache/lock_contention.go +++ b/diskcache/lock_contention.go @@ -31,7 +31,11 @@ type InstrumentedMutex struct { } // NewInstrumentedMutex creates a new instrumented mutex. -func NewInstrumentedMutex(lockType LockType, path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedMutex { +func NewInstrumentedMutex(lockType LockType, + path string, + lockWaitTime *prometheus.HistogramVec, + contention *prometheus.CounterVec, +) *InstrumentedMutex { return &InstrumentedMutex{ lockType: lockType, path: path, @@ -59,7 +63,7 @@ func (im *InstrumentedMutex) Lock() { // TryLock attempts to acquire mutex without blocking. func (im *InstrumentedMutex) TryLock() bool { - start := time.Now() + start := time.Now() // nolint:ifshort acquired := im.mu.TryLock() if acquired { im.observeLockTime(start, false) diff --git a/diskcache/lock_contention_test.go b/diskcache/lock_contention_test.go index 7143e510..5f381b24 100644 --- a/diskcache/lock_contention_test.go +++ b/diskcache/lock_contention_test.go @@ -18,7 +18,7 @@ func TestLockContention(t *testing.T) { // Test immediate lock (no contention) start := time.Now() mu.Lock() - duration1 := time.Since(start) + duration1 := time.Since(start) // nolint:ifshort mu.Unlock() // Should be very fast (no contention) diff --git a/diskcache/put_test.go b/diskcache/put_test.go index 5740fbc6..5c3ac126 100644 --- a/diskcache/put_test.go +++ b/diskcache/put_test.go @@ -244,8 +244,6 @@ func TestConcurrentPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics:\n%s", metrics.MetricFamily2Text(mfs)) - mSize := metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) require.NotNil(t, mSize) @@ -452,7 +450,6 @@ func TestPutOnCapacityReached(t *T.T) { t.Cleanup(func() { require.NoError(t, c.Close()) ResetMetrics() - t.Logf("metrics:\n%s", metrics.MetricFamily2Text(mfs)) }) }) }