diff --git a/diskcache/LOCK_CONTENTION_METRICS.md b/diskcache/LOCK_CONTENTION_METRICS.md new file mode 100644 index 00000000..a041e341 --- /dev/null +++ b/diskcache/LOCK_CONTENTION_METRICS.md @@ -0,0 +1,181 @@ +# Lock Contention Metrics Implementation + +This document explains how lock contention metrics are measured and used in the diskcache module. + +## Overview + +The enhanced diskcache now tracks lock contention across three critical lock types: +- **write lock** (`wlock`): Excludes concurrent Put operations +- **read lock** (`rlock`): Excludes concurrent Get operations +- **rw lock** (`rwlock`): Excludes structural operations (rotate, switch, drop, close) + +## Instrumentation Strategy + +### 1. Instrumented Locks + +Instead of using `sync.Mutex` and `sync.RWMutex`, we now use: + +```go +type InstrumentedMutex struct { + mu sync.Mutex + lockType LockType + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +type InstrumentedRWMutex struct { + mu sync.RWMutex + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} +``` + +### 2. Contention Detection + +Contention is detected using `TryLock()`: + +```go +func (im *InstrumentedMutex) Lock() { + start := time.Now() + + // Check if mutex is already locked (contention) + if im.mu.TryLock() { + // No contention - immediate acquisition + im.observeLockTime(start, false) + return + } + + // Contention occurred - wait for lock + im.observeContention() + im.mu.Lock() + im.observeLockTime(start, true) +} +``` + +## Metrics Exposed + +### 1. Lock Wait Time Histogram +``` +diskcache_lock_wait_seconds{lock_type="write",path="/cache/path"} 0.001 +diskcache_lock_wait_seconds{lock_type="read",path="/cache/path"} 0.0001 +diskcache_lock_wait_seconds{lock_type="rw",path="/cache/path"} 0.05 +``` + +**Buckets**: `[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]` seconds + +### 2. Lock Contention Counter +``` +diskcache_lock_contention_total{lock_type="write",path="/cache/path"} 15 +diskcache_lock_contention_total{lock_type="read",path="/cache/path"} 3 +diskcache_lock_contention_total{lock_type="rw",path="/cache/path"} 1 +``` + +## Usage Examples + +### 1. Monitoring Lock Contention Rate + +```promql +# Contention rate per second by lock type +rate(diskcache_lock_contention_total[5m]) by (lock_type, path) + +# Percentile of lock wait times +histogram_quantile(0.95, rate(diskcache_lock_wait_seconds_bucket[5m])) by (lock_type, path) +``` + +### 2. Alerting on High Contention + +```promql +# Alert when lock wait times exceed threshold +histogram_quantile(0.95, diskcache_lock_wait_seconds) > 0.1 + +# Alert when contention rate is high +rate(diskcache_lock_contention_total[5m]) > 10 +``` + +### 3. Performance Analysis + +```promql +# Lock wait time distribution +histogram_quantile(0.50, diskcache_lock_wait_seconds) by (lock_type) +histogram_quantile(0.95, diskcache_lock_wait_seconds) by (lock_type) +histogram_quantile(0.99, diskcache_lock_wait_seconds) by (lock_type) + +# Correlate contention with operations +rate(diskcache_lock_contention_total[5m]) / +rate(diskcache_put_total[5m] + diskcache_get_total[5m]) +``` + +## Implementation Benefits + +### 1. **Early Detection of Performance Issues** +- Identifies lock contention before it becomes a bottleneck +- Shows which lock types are under stress +- Helps optimize lock granularity and usage patterns + +### 2. **Correlation with System Load** +- Can correlate contention spikes with: + - High put/get rates + - Disk I/O delays + - Memory pressure + - File system issues + +### 3. **Capacity Planning** +- Understanding contention patterns helps with: + - Sizing thread pools + - Configuring batch sizes + - Optimizing cache operations + - Planning resource allocation + +### 4. **Troubleshooting** +- Quickly identify if performance issues are due to: + - Write contention (many puts) + - Read contention (many gets) + - Structural operations (rotations, switches) + +## Integration with Existing Metrics + +The lock contention metrics integrate seamlessly with existing diskcache metrics: + +``` +# Overall view of cache performance +diskcache_put_latency_seconds{path="/cache"} 0.001 +diskcache_get_latency_seconds{path="/cache"} 0.0005 +diskcache_lock_wait_seconds{lock_type="write",path="/cache"} 0.0002 +diskcache_lock_wait_seconds{lock_type="read",path="/cache"} 0.0001 + +# Identify if locks are the bottleneck +diskcache_lock_wait_seconds / diskcache_put_latency_seconds +``` + +## Performance Overhead + +The instrumentation adds minimal overhead: +- **Contention detection**: One `TryLock()` call before actual lock +- **Timing**: One `time.Since()` call per lock acquisition +- **Metrics update**: One histogram and counter observation per lock + +The overhead is negligible compared to typical I/O operations and provides valuable observability. + +## Production Deployment + +### 1. **Monitoring Dashboard** +Create Grafana panels showing: +- Lock wait time percentiles by type +- Contention rates over time +- Lock wait time vs operation latency +- Contention heat maps by cache path + +### 2. **Alerting Rules** +Set up alerts for: +- High p95/p99 lock wait times (>100ms) +- Elevated contention rates (>1/sec sustained) +- Sudden spikes in wait times + +### 3. **Capacity Analysis** +Use metrics to: +- Identify optimal thread counts +- Tune batch sizes +- Plan for scale increases +- Optimize lock granularity \ No newline at end of file diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 181cde2b..38e93a5c 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/GuanceCloud/cliutils/logger" ) const ( @@ -57,6 +59,9 @@ var ( // Invalid file header. ErrBadHeader = errors.New("bad header") + + l = logger.DefaultSLogger("diskcache") + once sync.Once ) // DiskCache is the representation of a disk cache. @@ -81,15 +86,16 @@ type DiskCache struct { // how long to wakeup a sleeping write-file wakeup time.Duration - wlock, // write-lock: used to exclude concurrent Put to the header file. - rlock *sync.Mutex // read-lock: used to exclude concurrent Get on the tail file. - rwlock *sync.Mutex // used to exclude switch/rotate/drop/Close on current disk cache instance. + wlock *InstrumentedMutex // write-lock: used to exclude concurrent Put to the header file. + rlock *InstrumentedMutex // read-lock: used to exclude concurrent Get on the tail file. + rwlock *InstrumentedMutex // used to exclude switch/rotate/drop/Close on current disk cache instance. flock *flock // disabled multi-Open on same path pos *pos // current read fd position info // specs of current diskcache - size atomic.Int64 // current byte size + size atomic.Int64 // current byte size + curBatchSize, // current writing file's size curReadSize, // current reading file's size batchSize, // current batch size(static) @@ -117,8 +123,10 @@ type DiskCache struct { } func (c *DiskCache) String() string { - c.rwlock.Lock() - defer c.rwlock.Unlock() + if c.rwlock != nil { + c.rwlock.Lock() + defer c.rwlock.Unlock() + } // nolint: lll // if there too many files(>10), only print file count @@ -135,8 +143,10 @@ func (c *DiskCache) String() string { } func (c *DiskCache) Pretty() string { - c.rwlock.Lock() - defer c.rwlock.Unlock() + if c.rwlock != nil { + c.rwlock.Lock() + defer c.rwlock.Unlock() + } arr := []string{} diff --git a/diskcache/drop.go b/diskcache/drop.go index 95a4a4b3..b3659fb6 100644 --- a/diskcache/drop.go +++ b/diskcache/drop.go @@ -28,7 +28,8 @@ func (c *DiskCache) dropBatch() error { if c.rfd != nil && c.curReadfile == fname { if err := c.rfd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, c.path, fname). + WithDetails("failed_to_close_read_file_during_drop") } c.rfd = nil @@ -36,7 +37,8 @@ func (c *DiskCache) dropBatch() error { if fi, err := os.Stat(fname); err == nil { if err := os.Remove(fname); err != nil { - return err + return WrapFileOperationError(OpRemove, err, c.path, fname). + WithDetails("failed_to_remove_file_during_drop") } c.size.Add(-fi.Size()) @@ -45,6 +47,9 @@ func (c *DiskCache) dropBatch() error { droppedDataVec.WithLabelValues(c.path, reasonExceedCapacity).Observe(float64(fi.Size())) datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) sizeVec.WithLabelValues(c.path).Sub(float64(fi.Size())) + } else { + return WrapFileOperationError(OpStat, err, c.path, fname). + WithDetails("failed_to_stat_file_during_drop") } return nil diff --git a/diskcache/envs.go b/diskcache/envs.go index e6356e42..a175df30 100644 --- a/diskcache/envs.go +++ b/diskcache/envs.go @@ -8,6 +8,7 @@ package diskcache import ( "os" "strconv" + "time" ) func (c *DiskCache) syncEnv() { @@ -37,6 +38,18 @@ func (c *DiskCache) syncEnv() { c.noPos = true } + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" { + if du, err := time.ParseDuration(v); err == nil && du > 0 { + c.pos.dumpInterval = du + } + } + + if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_AT"); ok && v != "" { + if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 { + c.pos.dumpCount = int(n) + } + } + if v, ok := os.LookupEnv("ENV_DISKCACHE_NO_LOCK"); ok && v != "" { c.noLock = true } diff --git a/diskcache/errors.go b/diskcache/errors.go new file mode 100644 index 00000000..df47db4a --- /dev/null +++ b/diskcache/errors.go @@ -0,0 +1,248 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "fmt" + "runtime" + "strings" +) + +// Operation type for error context. +type Operation string + +const ( + OpOpen Operation = "Open" + OpClose Operation = "Close" + OpPut Operation = "Put" + OpStreamPut Operation = "StreamPut" + OpGet Operation = "Get" + OpRotate Operation = "Rotate" + OpSwitch Operation = "Switch" + OpDrop Operation = "Drop" + OpLock Operation = "Lock" + OpUnlock Operation = "Unlock" + OpPos Operation = "Pos" + OpSeek Operation = "Seek" + OpWrite Operation = "Write" + OpRead Operation = "Read" + OpSync Operation = "Sync" + OpCreate Operation = "Create" + OpRemove Operation = "Remove" + OpRename Operation = "Rename" + OpStat Operation = "Stat" +) + +// CacheError represents an enhanced error with operation context and details. +type CacheError struct { + Operation Operation + Path string + File string + Details string + Err error + Caller string +} + +// Error implements the error interface. +func (e *CacheError) Error() string { + var parts []string + + parts = append(parts, string(e.Operation)) + + if e.Path != "" { + parts = append(parts, fmt.Sprintf("path=%s", e.Path)) + } + + if e.File != "" { + parts = append(parts, fmt.Sprintf("file=%s", e.File)) + } + + if e.Details != "" { + parts = append(parts, e.Details) + } + + base := fmt.Sprintf("diskcache %s: %s", strings.Join(parts, " "), e.Err) + + if e.Caller != "" { + return fmt.Sprintf("%s (caller: %s)", base, e.Caller) + } + + return base +} + +// Unwrap returns the underlying error for compatibility with errors.Is/As. +func (e *CacheError) Unwrap() error { + return e.Err +} + +// NewCacheError creates a new CacheError with enhanced context. +func NewCacheError(op Operation, err error, details string) *CacheError { + return &CacheError{ + Operation: op, + Err: err, + Caller: getCaller(), + Details: details, + } +} + +// WithPath adds path context to the error. +func (e *CacheError) WithPath(path string) *CacheError { + e.Path = path + return e +} + +// WithFile adds file context to the error. +func (e *CacheError) WithFile(file string) *CacheError { + e.File = file + return e +} + +// WithDetails adds additional details to the error. +func (e *CacheError) WithDetails(details string) *CacheError { + if e.Details != "" { + e.Details = fmt.Sprintf("%s: %s", e.Details, details) + } else { + e.Details = details + } + return e +} + +// getCaller returns the calling function name for debugging. +func getCaller() string { + _, file, line, ok := runtime.Caller(2) + if !ok { + return "" + } + + // Extract just the function name from the full path + parts := strings.Split(file, "/") + if len(parts) > 0 { + file = parts[len(parts)-1] + } + + return fmt.Sprintf("%s:%d", file, line) +} + +// Helper functions for creating specific error types + +// WrapPutError wraps errors from Put operations. +func WrapPutError(err error, path string, dataSize int) *CacheError { + return NewCacheError(OpPut, err, fmt.Sprintf("data_size=%d", dataSize)).WithPath(path) +} + +// WrapGetError wraps errors from Get operations. +func WrapGetError(err error, path string, file string) *CacheError { + return NewCacheError(OpGet, err, "").WithPath(path).WithFile(file) +} + +// WrapRotateError wraps errors from Rotate operations. +func WrapRotateError(err error, path string, oldFile, newFile string) *CacheError { + details := fmt.Sprintf("old=%s -> new=%s", oldFile, newFile) + return NewCacheError(OpRotate, err, details).WithPath(path) +} + +// WrapOpenError wraps errors from Open operations. +func WrapOpenError(err error, path string) *CacheError { + return NewCacheError(OpOpen, err, "").WithPath(path) +} + +// WrapCloseError wraps errors from Close operations. +func WrapCloseError(err error, path string, fdType string) *CacheError { + return NewCacheError(OpClose, err, fmt.Sprintf("fd_type=%s", fdType)).WithPath(path) +} + +// WrapLockError wraps errors from locking operations. +func WrapLockError(err error, path string, pid int) *CacheError { + return NewCacheError(OpLock, err, fmt.Sprintf("pid=%d", pid)).WithPath(path) +} + +// WrapPosError wraps errors from position operations. +func WrapPosError(err error, path string, seek int64) *CacheError { + return NewCacheError(OpPos, err, fmt.Sprintf("seek=%d", seek)).WithPath(path) +} + +// WrapFileOperationError wraps errors from generic file operations. +func WrapFileOperationError(op Operation, err error, path, file string) *CacheError { + return NewCacheError(op, err, "").WithPath(path).WithFile(file) +} + +// IsRetryable checks if an error is retryable based on its type and context. +func IsRetryable(err error) bool { + if err == nil { + return false + } + + var cacheErr *CacheError + if !isCacheError(err, &cacheErr) { + // For non-CacheError types, check known retryable patterns + return isTemporaryError(err) + } + + switch cacheErr.Operation { // nolint:exhaustive + case OpWrite, OpRead, OpSync, OpSeek: + return isTemporaryError(cacheErr.Err) + case OpLock: + // Lock errors might be retryable if they're "locked by another process" + return strings.Contains(cacheErr.Err.Error(), "locked by") + default: + return false + } +} + +// isCacheError checks if error is of type CacheError. +func isCacheError(err error, target **CacheError) bool { + // Use type assertion instead of errors.As for direct check + if ce, ok := err.(*CacheError); ok { // nolint:errorlint + *target = ce + return true + } + return false +} + +// isTemporaryError checks if an underlying error is temporary/retryable. +func isTemporaryError(err error) bool { + errStr := err.Error() + temporaryPatterns := []string{ + "resource temporarily unavailable", + "connection refused", + "timeout", + "network is unreachable", + "no space left on device", // This might be temporary if space gets freed + } + + for _, pattern := range temporaryPatterns { + if strings.Contains(strings.ToLower(errStr), pattern) { + return true + } + } + + return false +} + +// GetErrorContext extracts useful context information from errors. +func GetErrorContext(err error) map[string]interface{} { + context := make(map[string]interface{}) + + if err == nil { + return context + } + + var cacheErr *CacheError + if isCacheError(err, &cacheErr) { + context["operation"] = string(cacheErr.Operation) + context["path"] = cacheErr.Path + context["file"] = cacheErr.File + context["details"] = cacheErr.Details + context["caller"] = cacheErr.Caller + context["original_error"] = cacheErr.Err.Error() + context["retryable"] = IsRetryable(err) + } else { + context["original_error"] = err.Error() + context["retryable"] = IsRetryable(err) + } + + return context +} diff --git a/diskcache/errors_test.go b/diskcache/errors_test.go new file mode 100644 index 00000000..6d2ed83e --- /dev/null +++ b/diskcache/errors_test.go @@ -0,0 +1,318 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "errors" + "strings" + "testing" +) + +func TestCacheError_Error(t *testing.T) { + tests := []struct { + name string + err *CacheError + contains []string + }{ + { + name: "basic error", + err: &CacheError{ + Operation: OpPut, + Path: "/tmp/cache", + Err: errors.New("disk full"), + }, + contains: []string{"Put", "path=/tmp/cache", "disk full"}, + }, + { + name: "error with file and details", + err: &CacheError{ + Operation: OpRead, + Path: "/tmp/cache", + File: "data.0001", + Details: "header corrupted", + Err: errors.New("bad header"), + }, + contains: []string{"Read", "path=/tmp/cache", "file=data.0001", "header corrupted", "bad header"}, + }, + { + name: "error with caller", + err: &CacheError{ + Operation: OpRotate, + Path: "/tmp/cache", + Err: errors.New("permission denied"), + Caller: "rotate.go:123", + }, + contains: []string{"Rotate", "path=/tmp/cache", "permission denied", "caller: rotate.go:123"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + errStr := tt.err.Error() + for _, contain := range tt.contains { + if !strings.Contains(errStr, contain) { + t.Errorf("Error string should contain %q, got: %s", contain, errStr) + } + } + }) + } +} + +func TestCacheError_Unwrap(t *testing.T) { + originalErr := errors.New("original error") + cacheErr := &CacheError{ + Operation: OpPut, + Err: originalErr, + } + + if !errors.Is(cacheErr, originalErr) { + t.Error("CacheError should unwrap to original error") + } +} + +func TestNewCacheError(t *testing.T) { + err := errors.New("test error") + cacheErr := NewCacheError(OpGet, err, "test details") + + if cacheErr.Operation != OpGet { + t.Errorf("Expected operation %v, got %v", OpGet, cacheErr.Operation) + } + if cacheErr.Err != err { // nolint:errorlint + t.Errorf("Expected error %v, got %v", err, cacheErr.Err) + } + if cacheErr.Details != "test details" { + t.Errorf("Expected details %q, got %q", "test details", cacheErr.Details) + } + if cacheErr.Caller == "" { + t.Error("Caller should be set automatically") + } +} + +func TestCacheError_WithMethods(t *testing.T) { + err := errors.New("test error") + cacheErr := NewCacheError(OpPut, err, "") + + // Test WithPath + cacheErr = cacheErr.WithPath("/test/path") + if cacheErr.Path != "/test/path" { + t.Errorf("Expected path %q, got %q", "/test/path", cacheErr.Path) + } + + // Test WithFile + cacheErr = cacheErr.WithFile("test.data") + if cacheErr.File != "test.data" { + t.Errorf("Expected file %q, got %q", "test.data", cacheErr.File) + } + + // Test WithDetails + cacheErr = cacheErr.WithDetails("additional info") + if cacheErr.Details != "additional info" { + t.Errorf("Expected details %q, got %q", "additional info", cacheErr.Details) + } + + // Test WithDetails with existing details + cacheErr = cacheErr.WithDetails("more info") + expected := "additional info: more info" + if cacheErr.Details != expected { + t.Errorf("Expected details %q, got %q", expected, cacheErr.Details) + } +} + +func TestWrapFunctions(t *testing.T) { + baseErr := errors.New("base error") + + t.Run("WrapPutError", func(t *testing.T) { + err := WrapPutError(baseErr, "/path", 1024) + if err.Operation != OpPut { + t.Error("Operation should be Put") + } + if err.Path != "/path" { + t.Error("Path should be set") + } + if !strings.Contains(err.Details, "1024") { + t.Error("Details should contain data size") + } + }) + + t.Run("WrapGetError", func(t *testing.T) { + err := WrapGetError(baseErr, "/path", "file.dat") + if err.Operation != OpGet { + t.Error("Operation should be Get") + } + if err.File != "file.dat" { + t.Error("File should be set") + } + }) + + t.Run("WrapRotateError", func(t *testing.T) { + err := WrapRotateError(baseErr, "/path", "old", "new") + if err.Operation != OpRotate { + t.Error("Operation should be Rotate") + } + if !strings.Contains(err.Details, "old=old") { + t.Error("Details should contain old file") + } + }) +} + +func TestIsRetryable(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil error", nil, false}, + {"temporary error", &CacheError{Operation: OpWrite, Err: errors.New("resource temporarily unavailable")}, true}, + {"permission denied", &CacheError{Operation: OpWrite, Err: errors.New("permission denied")}, false}, + {"lock error", &CacheError{Operation: OpLock, Err: errors.New("locked by alive 1234")}, true}, + {"non-cache error temporary", errors.New("connection refused"), true}, + {"non-cache error permanent", errors.New("permission denied"), false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsRetryable(tt.err); got != tt.expected { + t.Errorf("IsRetryable(%v) = %v, want %v", tt.err, got, tt.expected) + } + }) + } +} + +func TestGetErrorContext(t *testing.T) { + t.Run("cache error", func(t *testing.T) { + originalErr := errors.New("test error") + cacheErr := NewCacheError(OpPut, originalErr, "details"). + WithPath("/test"). + WithFile("test.dat") + + context := GetErrorContext(cacheErr) + + if context["operation"] != string(OpPut) { + t.Error("Operation should be in context") + } + if context["path"] != "/test" { + t.Error("Path should be in context") + } + if context["file"] != "test.dat" { + t.Error("File should be in context") + } + if context["details"] != "details" { + t.Error("Details should be in context") + } + if context["original_error"] != "test error" { + t.Error("Original error should be in context") + } + if context["retryable"] != false { + t.Error("Retryable should be false for test error") + } + }) + + t.Run("regular error", func(t *testing.T) { + err := errors.New("connection refused") + context := GetErrorContext(err) + + if context["original_error"] != "connection refused" { + t.Error("Original error should be in context") + } + if context["retryable"] != true { + t.Error("Connection refused should be retryable") + } + }) + + t.Run("nil error", func(t *testing.T) { + context := GetErrorContext(nil) + if len(context) != 0 { + t.Error("Context should be empty for nil error") + } + }) +} + +func TestErrorIntegration(t *testing.T) { + // This test demonstrates how the enhanced errors work in practice + t.Run("error chaining example", func(t *testing.T) { + // Simulate a disk full error during Put with no drop policy + cache, err := Open(WithPath(t.TempDir()), WithCapacity(100), WithNoDrop(true)) // Very small capacity, no drop + if err != nil { + t.Fatalf("Failed to open cache: %v", err) + } + defer cache.Close() + + // Try to put data that exceeds capacity + data := make([]byte, 200) // Larger than capacity + err = cache.Put(data) + + // Check that we get a wrapped error + if err == nil { + t.Fatal("Expected error when exceeding capacity with no-drop policy") + } + + // Check error structure + var cacheErr *CacheError + if !isCacheError(err, &cacheErr) { + t.Fatal("Error should be of type CacheError") + } + + if cacheErr.Operation != OpPut { + t.Errorf("Expected operation Put, got %v", cacheErr.Operation) + } + + // Test error unwrapping + if !errors.Is(err, ErrCacheFull) { + t.Error("Error should unwrap to ErrCacheFull") + } + + // Test error context + context := GetErrorContext(err) + if context["operation"] != string(OpPut) { + t.Error("Context should contain operation") + } + }) + + t.Run("error details extraction", func(t *testing.T) { + err := WrapPutError(ErrTooLargeData, "/test/path", 2048). + WithDetails("max_size=1024") + + errStr := err.Error() + expectedParts := []string{ + "Put", + "path=/test/path", + "data_size=2048", + "max_size=1024", + "too large data", + } + + for _, part := range expectedParts { + if !strings.Contains(errStr, part) { + t.Errorf("Error string should contain %q, got: %s", part, errStr) + } + } + }) +} + +// Benchmark for error creation overhead +func BenchmarkCacheError_Creation(b *testing.B) { + baseErr := errors.New("test error") + + b.Run("NewCacheError", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = NewCacheError(OpPut, baseErr, "test details") + } + }) + + b.Run("WrapPutError", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = WrapPutError(baseErr, "/test/path", 1024) + } + }) + + b.Run("ErrorString", func(b *testing.B) { + err := WrapPutError(baseErr, "/test/path", 1024) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = err.Error() + } + }) +} diff --git a/diskcache/get.go b/diskcache/get.go index e14ac60e..61b8d01e 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -7,6 +7,7 @@ package diskcache import ( "encoding/binary" + "errors" "fmt" "io" "time" @@ -18,7 +19,8 @@ type Fn func([]byte) error func (c *DiskCache) switchNextFile() error { if c.curReadfile != "" { if err := c.removeCurrentReadingFile(); err != nil { - return fmt.Errorf("removeCurrentReadingFile: %w", err) + return NewCacheError(OpSwitch, err, "failed_to_remove_current_reading_file"). + WithPath(c.path).WithFile(c.curReadfile) } } @@ -31,7 +33,14 @@ func (c *DiskCache) skipBadFile() error { droppedDataVec.WithLabelValues(c.path, reasonBadDataFile).Observe(float64(c.curReadSize)) }() - return c.switchNextFile() + l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize) + + if err := c.switchNextFile(); err != nil { + return NewCacheError(OpGet, err, "failed_to_skip_bad_file"). + WithPath(c.path).WithFile(c.curReadfile). + WithDetails(fmt.Sprintf("file_size=%d", c.curReadSize)) + } + return nil } // Get fetch new data from disk cache, then passing to fn @@ -80,15 +89,19 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error { if err = func() error { c.wlock.Lock() defer c.wlock.Unlock() + return c.rotate() }(); err != nil { - return err + return NewCacheError(OpGet, err, "failed_to_wakeup_sleeping_write_file"). + WithPath(c.path). + WithDetails(fmt.Sprintf("idle_time=%v, batch_size=%d", + time.Since(c.wfdLastWrite), c.curBatchSize)) } } if c.rfd == nil { // no file reading, reading on the first file if err = c.switchNextFile(); err != nil { - return err + return WrapGetError(err, c.path, "") } } @@ -98,6 +111,17 @@ retry: } if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { + if err != nil && !errors.Is(err, io.EOF) { + l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) + err = WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("header_read: expected=%d, actual=%d", dataHeaderLen, n)) + } else if n > 0 && n != dataHeaderLen { + l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) + err = NewCacheError(OpRead, ErrUnexpectedReadSize, + fmt.Sprintf("header_size_mismatch: expected=%d, actual=%d", dataHeaderLen, n)). + WithPath(c.path).WithFile(c.rfd.Name()) + } + // On bad datafile, just ignore and delete the file. if err = c.skipBadFile(); err != nil { return err @@ -111,7 +135,8 @@ retry: if uint32(nbytes) == EOFHint { // EOF if err := c.switchNextFile(); err != nil { - return fmt.Errorf("switchNextFile: %w", err) + return WrapGetError(err, c.path, c.rfd.Name()). + WithDetails("eof_encountered_during_get") } goto retry // read next new file to save another Get() calling. @@ -130,18 +155,25 @@ retry: if len(readbuf) < nbytes { // seek to next read position - if _, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { - return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err) + if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { + return WrapFileOperationError(OpSeek, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("failed_to_seek_past_data: data_size=%d", nbytes)) + } else { + l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", + nbytes, len(readbuf), x, nbytes, c.curReadfile) + + droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) + return WrapGetError(ErrTooSmallReadBuf, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("buffer_too_small: required=%d, provided=%d", nbytes, len(readbuf))) } - - droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return ErrTooSmallReadBuf } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { - return fmt.Errorf("rfd.Read(%d buf): %w", len(readbuf[:nbytes]), err) + return WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("data_read: expected=%d, actual=%d", nbytes, n)) } else if n != nbytes { - return ErrUnexpectedReadSize + return WrapGetError(ErrUnexpectedReadSize, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("partial_read: expected=%d, actual=%d", nbytes, n)) } if fn == nil { @@ -152,7 +184,8 @@ retry: // seek back if !c.noFallbackOnError { if _, serr := c.rfd.Seek(-int64(dataHeaderLen+nbytes), io.SeekCurrent); serr != nil { - return fmt.Errorf("c.rfd.Seek(%d) on FallbackOnError: %w", -int64(dataHeaderLen+nbytes), serr) + return WrapFileOperationError(OpSeek, serr, c.path, c.rfd.Name()). + WithDetails(fmt.Sprintf("fallback_seek_failed: offset=%d", -int64(dataHeaderLen+nbytes))) } seekBackVec.WithLabelValues(c.path).Inc() @@ -164,11 +197,11 @@ __updatePos: // update seek position if !c.noPos && nbytes > 0 { c.pos.Seek += int64(dataHeaderLen + nbytes) - if derr := c.pos.dumpFile(); derr != nil { - return derr + if do, derr := c.pos.dumpFile(); derr != nil { + return WrapPosError(derr, c.path, c.pos.Seek).WithDetails("failed_to_update_position_after_get") + } else if do { + posUpdatedVec.WithLabelValues("get", c.path).Inc() } - - posUpdatedVec.WithLabelValues("get", c.path).Inc() } __end: diff --git a/diskcache/get_test.go b/diskcache/get_test.go index c046f157..99ac1552 100644 --- a/diskcache/get_test.go +++ b/diskcache/get_test.go @@ -463,7 +463,10 @@ func TestPutGet(t *T.T) { testData := []byte("0123456789") ndata := 10 - c, err := Open(WithPath(p)) + c, err := Open( + WithPath(p), + WithPosUpdate(0, 0), + ) assert.NoError(t, err) @@ -487,6 +490,7 @@ func TestPutGet(t *T.T) { // reopen the cache c2, err := Open(WithPath(p), + WithPosUpdate(0, 0), WithCapacity(int64(len(testData)*10)), WithBatchSize(int64(len(testData)*2))) require.NoError(t, err, "get error: %s", err) @@ -510,7 +514,8 @@ func TestPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics\n%s", metrics.MetricFamily2Text(mfs)) + fullMetrics := metrics.MetricFamily2Text(mfs) + _ = fullMetrics t.Cleanup(func() { c2.Close() @@ -518,3 +523,160 @@ func TestPutGet(t *T.T) { }) }) } + +func TestDelayPosDump(t *T.T) { + t.Run("pos-sync-at", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(3, 0)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 6; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 6*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(2), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-on-interval", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(-1, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + + time.Sleep(100 * time.Millisecond) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) + + t.Run("pos-sync-force", func(t *T.T) { + ResetMetrics() + + p := t.TempDir() + c, err := Open(WithPath(p), WithPosUpdate(0, time.Millisecond*100)) + assert.NoError(t, err) + + testData := []byte("0123456789") + ndata := 10 + + for i := 0; i < ndata; i++ { // write 10 data + require.NoError(t, c.Put(testData), "cache: %s", c) + } + + // make data file readable. + require.NoError(t, c.rotate()) + + // create n read pos + for i := 0; i < 3; i++ { + assert.NoError(t, c.Get(func(data []byte) error { + assert.Len(t, data, len(testData)) + return nil + })) + } + + assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek) + assert.NoError(t, c.Close()) + + _, err = os.Stat(c.pos.fname) + require.NoError(t, err) + + reg := prometheus.NewRegistry() + reg.MustRegister(Metrics()...) + mfs, err := reg.Gather() + require.NoError(t, err) + + assert.Equalf(t, float64(3), + metrics.GetMetricOnLabels(mfs, + "diskcache_pos_updated_total", + "get", + c.path, + ).GetCounter().GetValue(), + "got metrics\n%s", metrics.MetricFamily2Text(mfs), + ) + + t.Cleanup(func() { + c.Close() + os.RemoveAll(p) + }) + }) +} diff --git a/diskcache/lock.go b/diskcache/lock.go index 77512164..162920d8 100644 --- a/diskcache/lock.go +++ b/diskcache/lock.go @@ -38,7 +38,8 @@ func (l *flock) lock() error { } else { x, err := os.ReadFile(l.file) if err != nil { - return err + return WrapFileOperationError(OpRead, err, "", l.file). + WithDetails("failed_to_read_lock_file") } if len(x) == 0 { @@ -47,30 +48,42 @@ func (l *flock) lock() error { pidInFile, err := strconv.Atoi(string(x)) if err != nil { - return err + return NewCacheError(OpLock, err, + fmt.Sprintf("failed_to_parse_pid_from_lock_file: content=%q", string(x))). + WithFile(l.file) } else { switch pidInFile { case -1: // unlocked goto write case curPid: - return fmt.Errorf("lock failed(locked by pid %d)", curPid) + return NewCacheError(OpLock, fmt.Errorf("already_locked_by_current_process"), ""). + WithFile(l.file).WithDetails(fmt.Sprintf("current_pid=%d", curPid)) default: // other pid, may terminated if pidAlive(pidInFile) { - return fmt.Errorf("lock failed(locked by alive %d)", pidInFile) + return WrapLockError(fmt.Errorf("process_already_has_lock"), "", pidInFile). + WithFile(l.file) } } } } write: - return os.WriteFile(l.file, []byte(strconv.Itoa(curPid)), 0o600) + if err := os.WriteFile(l.file, []byte(strconv.Itoa(curPid)), 0o600); err != nil { + return WrapFileOperationError(OpWrite, err, "", l.file). + WithDetails(fmt.Sprintf("failed_to_write_pid_to_lock_file: pid=%d", curPid)) + } + return nil } func (l *flock) unlock() error { l.mtx.Lock() defer l.mtx.Unlock() - return os.WriteFile(l.file, []byte(strconv.Itoa(-1)), 0o600) + if err := os.WriteFile(l.file, []byte(strconv.Itoa(-1)), 0o600); err != nil { + return WrapFileOperationError(OpWrite, err, "", l.file). + WithDetails("failed_to_write_unlock_marker") + } + return nil } func pidAlive(pid int) bool { diff --git a/diskcache/lock_contention.go b/diskcache/lock_contention.go new file mode 100644 index 00000000..7b949872 --- /dev/null +++ b/diskcache/lock_contention.go @@ -0,0 +1,180 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// LockType represents different types of locks in diskcache. +type LockType string + +const ( + LockTypeWrite LockType = "write" + LockTypeRead LockType = "read" + LockTypeRW LockType = "rw" +) + +// InstrumentedMutex wraps sync.Mutex with contention tracking. +type InstrumentedMutex struct { + mu sync.Mutex + lockType LockType + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +// NewInstrumentedMutex creates a new instrumented mutex. +func NewInstrumentedMutex(lockType LockType, + path string, + lockWaitTime *prometheus.HistogramVec, + contention *prometheus.CounterVec, +) *InstrumentedMutex { + return &InstrumentedMutex{ + lockType: lockType, + path: path, + lockWaitTime: lockWaitTime, + contention: contention, + } +} + +// Lock acquires the mutex with contention tracking. +func (im *InstrumentedMutex) Lock() { + start := time.Now() + + // Check if mutex is already locked (contention) + if im.mu.TryLock() { + // No contention - immediate acquisition + im.observeLockTime(start, false) + return + } + + // Contention occurred - wait for lock + im.observeContention() + im.mu.Lock() + im.observeLockTime(start, true) +} + +// TryLock attempts to acquire mutex without blocking. +func (im *InstrumentedMutex) TryLock() bool { + start := time.Now() // nolint:ifshort + acquired := im.mu.TryLock() + if acquired { + im.observeLockTime(start, false) + } else { + im.observeContention() + } + return acquired +} + +// Unlock releases the mutex. +func (im *InstrumentedMutex) Unlock() { + im.mu.Unlock() +} + +// observeLockTime records the total time to acquire the lock. +func (im *InstrumentedMutex) observeLockTime(start time.Time, hadContention bool) { + duration := time.Since(start).Seconds() + + // Record wait time + im.lockWaitTime.WithLabelValues(string(im.lockType), im.path).Observe(duration) + + // If this was a contention scenario, also record it + if hadContention { + im.contention.WithLabelValues(string(im.lockType), im.path).Inc() + } +} + +// observeContention records a contention event. +func (im *InstrumentedMutex) observeContention() { + im.contention.WithLabelValues(string(im.lockType), im.path).Inc() +} + +// InstrumentedRWMutex wraps sync.RWMutex with contention tracking. +type InstrumentedRWMutex struct { + mu sync.RWMutex + path string + lockWaitTime *prometheus.HistogramVec + contention *prometheus.CounterVec +} + +// NewInstrumentedRWMutex creates a new instrumented RWMutex. +func NewInstrumentedRWMutex(path string, lockWaitTime *prometheus.HistogramVec, contention *prometheus.CounterVec) *InstrumentedRWMutex { + return &InstrumentedRWMutex{ + path: path, + lockWaitTime: lockWaitTime, + contention: contention, + } +} + +// RLock acquires read lock with contention tracking. +func (irm *InstrumentedRWMutex) RLock() { + start := time.Now() + + if irm.mu.TryRLock() { + // No contention + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) + return + } + + // Contention occurred + irm.contention.WithLabelValues(string(LockTypeRead), irm.path).Inc() + irm.mu.RLock() + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) +} + +// TryRLock attempts to acquire read lock without blocking. +func (irm *InstrumentedRWMutex) TryRLock() bool { + start := time.Now() + acquired := irm.mu.TryRLock() + if acquired { + irm.lockWaitTime.WithLabelValues(string(LockTypeRead), irm.path).Observe(time.Since(start).Seconds()) + } else { + irm.contention.WithLabelValues(string(LockTypeRead), irm.path).Inc() + } + return acquired +} + +// RUnlock releases read lock. +func (irm *InstrumentedRWMutex) RUnlock() { + irm.mu.RUnlock() +} + +// Lock acquires write lock with contention tracking. +func (irm *InstrumentedRWMutex) Lock() { + start := time.Now() + + if irm.mu.TryLock() { + // No contention + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) + return + } + + // Contention occurred + irm.contention.WithLabelValues(string(LockTypeWrite), irm.path).Inc() + irm.mu.Lock() + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) +} + +// TryLock attempts to acquire write lock without blocking. +func (irm *InstrumentedRWMutex) TryLock() bool { + start := time.Now() + acquired := irm.mu.TryLock() + if acquired { + irm.lockWaitTime.WithLabelValues(string(LockTypeWrite), irm.path).Observe(time.Since(start).Seconds()) + } else { + irm.contention.WithLabelValues(string(LockTypeWrite), irm.path).Inc() + } + return acquired +} + +// Unlock releases write lock. +func (irm *InstrumentedRWMutex) Unlock() { + irm.mu.Unlock() +} diff --git a/diskcache/lock_contention_test.go b/diskcache/lock_contention_test.go new file mode 100644 index 00000000..5f381b24 --- /dev/null +++ b/diskcache/lock_contention_test.go @@ -0,0 +1,146 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import ( + "sync" + "testing" + "time" +) + +func TestLockContention(t *testing.T) { + // Create instrumented mutex + mu := NewInstrumentedMutex(LockTypeWrite, "/test/path", lockWaitTimeVec, lockContentionVec) + + // Test immediate lock (no contention) + start := time.Now() + mu.Lock() + duration1 := time.Since(start) // nolint:ifshort + mu.Unlock() + + // Should be very fast (no contention) + if duration1 > time.Millisecond { + t.Errorf("Immediate lock took too long: %v", duration1) + } + + // Test contention + var wg sync.WaitGroup + contentionStarted := make(chan bool, 1) + + // First goroutine holds lock + wg.Add(1) + go func() { + defer wg.Done() + mu.Lock() + defer mu.Unlock() + + // Signal that first lock is acquired + contentionStarted <- true + + // Hold lock for a bit + time.Sleep(50 * time.Millisecond) + }() + + // Wait for first lock to be acquired + <-contentionStarted + + // Second goroutine tries to lock (will contend) + wg.Add(1) + go func() { + defer wg.Done() + start := time.Now() + mu.Lock() // This should wait due to contention + duration := time.Since(start) + mu.Unlock() + + // Should wait at least some time due to contention + if duration < 10*time.Millisecond { + t.Errorf("Contented lock didn't wait enough: %v", duration) + } + }() + + wg.Wait() + + // Check that contention was recorded + t.Logf("Lock contention test completed") +} + +func TestInstrumentedMutex(t *testing.T) { + mu := NewInstrumentedMutex(LockTypeRead, "/test/path", lockWaitTimeVec, lockContentionVec) + + // Test TryLock success + if !mu.TryLock() { + t.Error("TryLock should succeed initially") + } + + // Test TryLock failure (already locked) + if mu.TryLock() { + t.Error("TryLock should fail when already locked") + } + + mu.Unlock() + + // Test TryLock after unlock + if !mu.TryLock() { + t.Error("TryLock should succeed after unlock") + } + + mu.Unlock() +} + +func TestInstrumentedRWMutex(t *testing.T) { + rwmu := NewInstrumentedRWMutex("/test/path", lockWaitTimeVec, lockContentionVec) + + // Test read lock + rwmu.RLock() + if !rwmu.TryRLock() { + t.Error("Multiple read locks should be allowed") + } + rwmu.RUnlock() + rwmu.RUnlock() + + // Test write lock exclusion + rwmu.Lock() + if rwmu.TryLock() { + t.Error("Write lock should exclude other writes") + } + if rwmu.TryRLock() { + t.Error("Write lock should exclude reads") + } + rwmu.Unlock() + + // Test read-write contention + var wg sync.WaitGroup + readStarted := make(chan bool, 1) + + // Start read lock holder + wg.Add(1) + go func() { + defer wg.Done() + rwmu.RLock() + defer rwmu.RUnlock() + readStarted <- true + time.Sleep(30 * time.Millisecond) + }() + + <-readStarted + + // Try to get write lock while read is held + wg.Add(1) + go func() { + defer wg.Done() + start := time.Now() + rwmu.Lock() // Should wait for read to complete + duration := time.Since(start) + rwmu.Unlock() + + if duration < 10*time.Millisecond { + t.Errorf("Write lock during read didn't wait enough: %v", duration) + } + }() + + wg.Wait() +} diff --git a/diskcache/metric.go b/diskcache/metric.go index 2d7688b6..56b0ff32 100644 --- a/diskcache/metric.go +++ b/diskcache/metric.go @@ -31,6 +31,10 @@ var ( getLatencyVec, putLatencyVec *prometheus.SummaryVec + // Lock contention metrics. + lockWaitTimeVec *prometheus.HistogramVec + lockContentionVec *prometheus.CounterVec + ns = "diskcache" ) @@ -181,7 +185,7 @@ func setupMetrics() { prometheus.GaugeOpts{ Namespace: ns, Name: "size", - Help: "Current cache size that waiting to be consumed(get)", + Help: "Current cache size that waiting to be consumed(get). The size include header bytes", }, []string{"path"}, ) @@ -220,6 +224,26 @@ func setupMetrics() { []string{"path"}, ) + // Lock contention metrics + lockWaitTimeVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ns, + Name: "lock_wait_seconds", + Help: "Time spent waiting for locks by lock type", + Buckets: []float64{0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5}, + }, + []string{"lock_type", "path"}, + ) + + lockContentionVec = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ns, + Name: "lock_contention_total", + Help: "Number of lock contention events", + }, + []string{"lock_type", "path"}, + ) + metrics.MustRegister(Metrics()...) } @@ -239,6 +263,10 @@ func ResetMetrics() { putLatencyVec.Reset() putBytesVec.Reset() getBytesVec.Reset() + + // Lock contention metrics + lockWaitTimeVec.Reset() + lockContentionVec.Reset() } func Metrics() []prometheus.Collector { @@ -262,6 +290,10 @@ func Metrics() []prometheus.Collector { putLatencyVec, getBytesVec, putBytesVec, + + // Lock contention metrics + lockWaitTimeVec, + lockContentionVec, } } diff --git a/diskcache/metric_test.go b/diskcache/metric_test.go index 2b622f61..792d2667 100644 --- a/diskcache/metric_test.go +++ b/diskcache/metric_test.go @@ -126,52 +126,53 @@ func TestMetric(t *T.T) { assert.NoError(t, c.Rotate()) mfs, err := reg.Gather() + fullMetrics := metrics.MetricFamily2Text(mfs) assert.NoError(t, err) m := metrics.GetMetricOnLabels(mfs, "diskcache_put_bytes", c.path) - require.NotNilf(t, m, "metrics:\n%s", c.path, metrics.MetricFamily2Text(mfs)) - assert.Equal(t, uint64(1), m.GetSummary().GetSampleCount()) - assert.Equal(t, float64(108), // 100 + size(4B) + eof(4B) - m.GetSummary().GetSampleSum()) + require.NotNilf(t, m, fullMetrics) + + assert.Equalf(t, uint64(1), m.GetSummary().GetSampleCount(), fullMetrics) + assert.Equalf(t, float64(108), // 100 + size(4B) + eof(4B) + m.GetSummary().GetSampleSum(), fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.Equal(t, float64(108), - m.GetGauge().GetValue()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, float64(108), m.GetGauge().GetValue(), fullMetrics) // these fileds all zero m = metrics.GetMetricOnLabels(mfs, "diskcache_dropped_batch", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_bytes_total", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_latency", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_rotate", c.path) - require.Nil(t, m) + require.Nilf(t, m, fullMetrics) assert.NoError(t, c.Get(nil)) assert.Error(t, c.Get(nil)) // error: no data, trigger switch, and update get metrics - mfs, err = reg.Gather() + mfs, err = reg.Gather() // get updated metrics assert.NoError(t, err) - t.Logf("metrics:\n%s", metrics.MetricFamily2Text(mfs)) + fullMetrics = metrics.MetricFamily2Text(mfs) m = metrics.GetMetricOnLabels(mfs, "diskcache_get_bytes", c.path) - require.NotNil(t, m) - assert.Equal(t, uint64(1), m.GetSummary().GetSampleCount()) - assert.Equal(t, float64(108), m.GetSummary().GetSampleSum()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, uint64(1), m.GetSummary().GetSampleCount(), fullMetrics) + assert.Equalf(t, float64(108), m.GetSummary().GetSampleSum(), fullMetrics) m = metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.Equal(t, 0.0, m.GetGauge().GetValue()) + require.NotNilf(t, m, fullMetrics) + assert.Equalf(t, 0.0, m.GetGauge().GetValue(), fullMetrics) assert.NoError(t, c.Close()) diff --git a/diskcache/open.go b/diskcache/open.go index 06c6fd5c..38c22ad7 100644 --- a/diskcache/open.go +++ b/diskcache/open.go @@ -11,12 +11,21 @@ import ( "path/filepath" "sort" "strconv" - "sync" "time" + + "github.com/GuanceCloud/cliutils/logger" ) +func setupLogger() { + once.Do(func() { + l = logger.SLogger("diskcache") + }) +} + // Open init and create a new disk cache. We can set other options with various options. func Open(opts ...CacheOption) (*DiskCache, error) { + setupLogger() + c := defaultInstance() // apply extra options @@ -27,7 +36,7 @@ func Open(opts ...CacheOption) (*DiskCache, error) { } if err := c.doOpen(); err != nil { - return nil, err + return nil, WrapOpenError(err, c.path).WithDetails("failed_to_open_diskcache") } defer func() { @@ -54,9 +63,9 @@ func defaultInstance() *DiskCache { batchSize: 20 * 1024 * 1024, maxDataSize: 0, // not set - wlock: &sync.Mutex{}, - rlock: &sync.Mutex{}, - rwlock: &sync.Mutex{}, + wlock: nil, // Will be initialized in doOpen() when path is known + rlock: nil, // Will be initialized in doOpen() when path is known + rwlock: nil, // Will be initialized in doOpen() when path is known wakeup: time.Second * 3, dirPerms: 0o750, @@ -64,6 +73,10 @@ func defaultInstance() *DiskCache { pos: &pos{ Seek: 0, Name: nil, + + // dump position each 100ms or 100 update + dumpInterval: time.Millisecond * 100, + dumpCount: 100, }, } } @@ -87,14 +100,15 @@ func (c *DiskCache) doOpen() error { } if err := os.MkdirAll(c.path, c.dirPerms); err != nil { - return err + return NewCacheError(OpCreate, err, fmt.Sprintf("failed_to_create_directory: perms=%o", c.dirPerms)). + WithPath(c.path) } // disable open multiple times if !c.noLock { fl := newFlock(c.path) if err := fl.lock(); err != nil { - return fmt.Errorf("lock: %w", err) + return WrapLockError(err, c.path, 0).WithDetails("failed_to_acquire_directory_lock") } else { c.flock = fl } @@ -113,16 +127,29 @@ func (c *DiskCache) doOpen() error { maxDataVec.WithLabelValues(c.path).Set(float64(c.maxDataSize)) batchSizeVec.WithLabelValues(c.path).Set(float64(c.batchSize)) + // Initialize instrumented locks now that we have the path + if c.wlock == nil { + c.wlock = NewInstrumentedMutex(LockTypeWrite, c.path, lockWaitTimeVec, lockContentionVec) + } + if c.rlock == nil { + c.rlock = NewInstrumentedMutex(LockTypeRead, c.path, lockWaitTimeVec, lockContentionVec) + } + if c.rwlock == nil { + c.rwlock = NewInstrumentedMutex(LockTypeRW, c.path, lockWaitTimeVec, lockContentionVec) + } + // write append fd, always write to the same-name file if err := c.openWriteFile(); err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_open_write_file"). + WithPath(c.path).WithFile(c.curWriteFile) } // list files under @path if err := filepath.Walk(c.path, func(path string, fi os.FileInfo, err error) error { if err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_walk_directory"). + WithPath(c.path).WithFile(path) } if fi.IsDir() { @@ -132,10 +159,6 @@ func (c *DiskCache) doOpen() error { switch filepath.Base(path) { case ".lock", ".pos": // ignore them case "data": // not rotated writing file, do not count on sizeVec. - c.size.Add(fi.Size()) - // NOTE: c.size not always equal to sizeVec. c.size used to limit - // total bytes used for Put(), but sizeVec used to count size that - // waiting to be Get(). default: c.size.Add(fi.Size()) sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) @@ -148,11 +171,14 @@ func (c *DiskCache) doOpen() error { } sort.Strings(c.dataFiles) // make file-name sorted for FIFO Get() + l.Infof("on open loaded %d files", len(c.dataFiles)) + datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) // first get, try load .pos if !c.noPos { if err := c.loadUnfinishedFile(); err != nil { - return err + return NewCacheError(OpOpen, err, "failed_to_load_position_file"). + WithPath(c.path) } } @@ -172,7 +198,7 @@ func (c *DiskCache) Close() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return err + return WrapCloseError(err, c.path, "read_fd") } c.rfd = nil } @@ -180,21 +206,21 @@ func (c *DiskCache) Close() error { if !c.noLock { if c.flock != nil { if err := c.flock.unlock(); err != nil { - return err + return WrapLockError(err, c.path, 0).WithDetails("failed_to_release_directory_lock") } } } if c.wfd != nil { if err := c.wfd.Close(); err != nil { - return err + return WrapCloseError(err, c.path, "write_fd") } c.wfd = nil } if c.pos != nil { if err := c.pos.close(); err != nil { - return err + return WrapPosError(err, c.path, c.pos.Seek).WithDetails("failed_to_close_position_file") } } diff --git a/diskcache/options.go b/diskcache/options.go index fb922f7c..ba7d2c42 100644 --- a/diskcache/options.go +++ b/diskcache/options.go @@ -67,6 +67,24 @@ func WithNoPos(on bool) CacheOption { } } +// WithPosUpdate set .pos update intervals. +// +// cnt used to specify how many update on .pos triger a real disk update. +// We can set cnt = 0 to force update .pos on every Get action. +// +// du used to specify how often to triger a real disk update on file .pos. +func WithPosUpdate(cnt int, du time.Duration) CacheOption { + return func(c *DiskCache) { + if cnt >= 0 { + c.pos.dumpCount = cnt + } + + if du >= 0 { + c.pos.dumpInterval = du + } + } +} + // WithWakeup set duration on wakeup(default 3s), this wakeup time // used to shift current-writing-file to ready-to-reading-file. // diff --git a/diskcache/pos.go b/diskcache/pos.go index a2bf768a..891fa568 100644 --- a/diskcache/pos.go +++ b/diskcache/pos.go @@ -12,12 +12,18 @@ import ( "fmt" "os" "path/filepath" + "time" ) type pos struct { Seek int64 `json:"seek"` Name []byte `json:"name"` + cnt, + dumpCount int + dumpInterval time.Duration + lastDump time.Time + fd *os.File fname string // where to dump the binary data buf *bytes.Buffer // reused buffer to build the binary data @@ -26,7 +32,8 @@ type pos struct { func (p *pos) close() error { if p.fd != nil { if err := p.fd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, "", p.fname). + WithDetails("failed_to_close_position_fd") } p.fd = nil @@ -45,7 +52,8 @@ func (p *pos) String() string { func posFromFile(fname string) (*pos, error) { bin, err := os.ReadFile(filepath.Clean(fname)) if err != nil { - return nil, err + return nil, WrapFileOperationError(OpRead, err, "", fname). + WithDetails("failed_to_read_position_file") } if len(bin) <= 8 { @@ -54,7 +62,9 @@ func posFromFile(fname string) (*pos, error) { var p pos if err := p.UnmarshalBinary(bin); err != nil { - return nil, err + return nil, NewCacheError(OpPos, err, + fmt.Sprintf("failed_to_unmarshal_position_data: data_len=%d", len(bin))). + WithFile(fname) } return &p, nil } @@ -91,7 +101,8 @@ func (p *pos) UnmarshalBinary(bin []byte) error { func (p *pos) reset() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { - return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) + return WrapFileOperationError(OpCreate, err, "", p.fname). + WithDetails("failed_to_create_position_file_for_reset") } else { p.fd = fd } @@ -108,37 +119,63 @@ func (p *pos) reset() error { p.Seek = -1 p.Name = nil - return p.dumpFile() + return p.doDumpFile() } -func (p *pos) dumpFile() error { +func (p *pos) doDumpFile() error { if p.fd == nil { if fd, err := os.OpenFile(p.fname, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600); err != nil { - return fmt.Errorf("open pos file(%q) failed: %w", p.fname, err) + return WrapFileOperationError(OpCreate, err, "", p.fname). + WithDetails("failed_to_open_position_file_for_dump") } else { p.fd = fd } } if data, err := p.MarshalBinary(); err != nil { - return err + return NewCacheError(OpPos, err, "failed_to_marshal_position_data"). + WithFile(p.fname) } else { if err := p.fd.Truncate(0); err != nil { - return fmt.Errorf("fd.Truncate: %w", err) + return WrapFileOperationError(OpWrite, err, "", p.fname). + WithDetails("failed_to_truncate_position_file") } if _, err := p.fd.Seek(0, 0); err != nil { - return fmt.Errorf("fd.Seek: %w", err) + return WrapFileOperationError(OpSeek, err, "", p.fname). + WithDetails("failed_to_seek_to_start_of_position_file") } if _, err := p.fd.Write(data); err != nil { - return fmt.Errorf("dumpFile(%q): %w", p.fname, err) + return WrapFileOperationError(OpWrite, err, "", p.fname). + WithDetails("failed_to_write_position_data") } return nil } } +func (p *pos) dumpFile() (bool, error) { + if p.dumpCount == 0 { // force dump .pos on every Get action. + return true, p.doDumpFile() + } + + p.cnt++ + if p.cnt%p.dumpCount == 0 { + p.lastDump = time.Now() + return true, p.doDumpFile() + } + + if p.dumpInterval > 0 { + if time.Since(p.lastDump) >= p.dumpInterval { + p.lastDump = time.Now() + return true, p.doDumpFile() + } + } + + return false, nil +} + // for benchmark. func (p *pos) dumpJSON() ([]byte, error) { j, err := json.Marshal(p) diff --git a/diskcache/pos_test.go b/diskcache/pos_test.go index 3ac62f7d..74236a57 100644 --- a/diskcache/pos_test.go +++ b/diskcache/pos_test.go @@ -8,6 +8,7 @@ package diskcache import ( "fmt" T "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -16,7 +17,7 @@ func TestDump(t *T.T) { t.Run(`dump-undump`, func(t *T.T) { p := &pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -29,7 +30,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(1024*1024*1024), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -37,7 +38,7 @@ func TestDump(t *T.T) { t.Run(`seek--1`, func(t *T.T) { p := &pos{ Seek: -1, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } data, err := p.MarshalBinary() @@ -48,15 +49,15 @@ func TestDump(t *T.T) { var p2 pos assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(-1), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) t.Run(`seek-0`, func(t *T.T) { p := &pos{ + Name: fmt.Appendf(nil, "data.%032d", 1234), Seek: 0, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), } data, err := p.MarshalBinary() @@ -68,7 +69,7 @@ func TestDump(t *T.T) { assert.NoError(t, p2.UnmarshalBinary(data)) assert.Equal(t, int64(0), p2.Seek) - assert.Equal(t, []byte(fmt.Sprintf("data.%032d", 1234)), p2.Name) + assert.Equal(t, fmt.Appendf(nil, "data.%032d", 1234), p2.Name) t.Logf("pos: %s", p) }) @@ -77,7 +78,7 @@ func TestDump(t *T.T) { func BenchmarkPosDump(b *T.B) { p := pos{ Seek: 1024 * 1024 * 1024, - Name: []byte(fmt.Sprintf("data.%032d", 1234)), + Name: fmt.Appendf(nil, "data.%032d", 1234), } b.Run("binary", func(b *T.B) { @@ -91,4 +92,29 @@ func BenchmarkPosDump(b *T.B) { p.dumpJSON() } }) + + b.Run("force-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 0, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) + + b.Run("interval-dump", func(b *T.B) { + p := pos{ + Seek: 1024 * 1024 * 1024, + Name: fmt.Appendf(nil, "data.%032d", 1234), + dumpCount: 100, + dumpInterval: 100 * time.Millisecond, + } + + for i := 0; i < b.N; i++ { + p.dumpFile() + } + }) } diff --git a/diskcache/put.go b/diskcache/put.go index 84d11320..4c5d370e 100644 --- a/diskcache/put.go +++ b/diskcache/put.go @@ -33,37 +33,41 @@ func (c *DiskCache) Put(data []byte) error { if c.IsFull(data) { if c.noDrop { - return ErrCacheFull + return WrapPutError(ErrCacheFull, c.path, len(data)).WithDetails("no_drop_enabled") } if c.filoDrop { // do not accept new data droppedDataVec.WithLabelValues(c.path, reasonExceedCapacity).Observe(float64(len(data))) - return ErrCacheFull + return WrapPutError(ErrCacheFull, c.path, len(data)).WithDetails("filo_drop_policy") } if err := c.dropBatch(); err != nil { - return err + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_drop_batch") } } if c.maxDataSize > 0 && int32(len(data)) > c.maxDataSize { - return ErrTooLargeData + return WrapPutError(ErrTooLargeData, c.path, len(data)).WithDetails( + fmt.Sprintf("max_size=%d, actual_size=%d", c.maxDataSize, len(data))) } hdr := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(hdr, uint32(len(data))) if _, err := c.wfd.Write(hdr); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_header") } if _, err := c.wfd.Write(data); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_data") } if !c.noSync { if err := c.wfd.Sync(); err != nil { - return err + return WrapFileOperationError(OpSync, err, c.path, c.wfd.Name()). + WithDetails("failed_to_sync_write") } } @@ -73,7 +77,7 @@ func (c *DiskCache) Put(data []byte) error { // rotate new file if c.curBatchSize >= c.batchSize { if err := c.rotate(); err != nil { - return err + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_rotate_batch") } } @@ -94,28 +98,34 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { ) if size <= 0 { - return ErrInvalidStreamSize + return NewCacheError(OpStreamPut, ErrInvalidStreamSize, + fmt.Sprintf("invalid_size=%d", size)).WithPath(c.path) } c.wlock.Lock() defer c.wlock.Unlock() if c.capacity > 0 && c.size.Load()+int64(size) > c.capacity { - return ErrCacheFull + return NewCacheError(OpStreamPut, ErrCacheFull, + fmt.Sprintf("capacity_exceeded: current=%d, new=%d, max=%d", + c.size.Load(), size, c.capacity)).WithPath(c.path) } if c.maxDataSize > 0 && size > int(c.maxDataSize) { - return ErrTooLargeData + return NewCacheError(OpStreamPut, ErrTooLargeData, + fmt.Sprintf("size_exceeded: max=%d, actual=%d", c.maxDataSize, size)).WithPath(c.path) } if startOffset, err = c.wfd.Seek(0, io.SeekCurrent); err != nil { - return fmt.Errorf("Seek(0, SEEK_CUR): %w", err) + return WrapFileOperationError(OpSeek, err, c.path, c.wfd.Name()). + WithDetails("failed_to_get_current_position") } defer func() { if total > 0 && err != nil { // fallback to origin position if _, serr := c.wfd.Seek(startOffset, io.SeekStart); serr != nil { - c.LastErr = serr + c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.wfd.Name()). + WithDetails(fmt.Sprintf("failed_to_fallback_to_position_%d", startOffset)) } } @@ -125,20 +135,24 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { if size > 0 { binary.LittleEndian.PutUint32(c.batchHeader, uint32(size)) if _, err := c.wfd.Write(c.batchHeader); err != nil { - return err + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_stream_header") } } total, err = io.CopyN(c.wfd, r, int64(size)) if err != nil && !errors.Is(err, io.EOF) { - return err + return NewCacheError(OpStreamPut, err, + fmt.Sprintf("failed_to_copy_stream_data: expected=%d, copied=%d", size, total)). + WithPath(c.path) } c.curBatchSize += (total + dataHeaderLen) if c.curBatchSize >= c.batchSize { if err := c.rotate(); err != nil { - return err + return NewCacheError(OpStreamPut, err, "failed_to_rotate_after_stream_put"). + WithPath(c.path) } } diff --git a/diskcache/put_test.go b/diskcache/put_test.go index c901586f..5c3ac126 100644 --- a/diskcache/put_test.go +++ b/diskcache/put_test.go @@ -207,14 +207,15 @@ func TestConcurrentPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics:\n%s", metrics.MetricFamily2Text(mfs)) + + fullMetrics := metrics.MetricFamily2Text(mfs) m := metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) - require.NotNil(t, m) - assert.InDelta(t, 0, m.GetGauge().GetValue(), 0.1) + require.NotNilf(t, m, fullMetrics) + assert.InDeltaf(t, 0, m.GetGauge().GetValue(), 0.1, fullMetrics) t.Cleanup(func() { ResetMetrics() @@ -243,8 +244,6 @@ func TestConcurrentPutGet(t *T.T) { mfs, err := reg.Gather() require.NoError(t, err) - t.Logf("got metrics:\n%s", metrics.MetricFamily2Text(mfs)) - mSize := metrics.GetMetricOnLabels(mfs, "diskcache_size", c.path) require.NotNil(t, mSize) @@ -451,7 +450,6 @@ func TestPutOnCapacityReached(t *T.T) { t.Cleanup(func() { require.NoError(t, c.Close()) ResetMetrics() - t.Logf("metrics:\n%s", metrics.MetricFamily2Text(mfs)) }) }) } diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 75d1968d..6637e46a 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -37,7 +37,8 @@ func (c *DiskCache) rotate() error { eof := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(eof, EOFHint) if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end - return fmt.Errorf("rotate on write EOF: %w", err) + return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + WithDetails("failed_to_write_eof_marker_during_rotate") } // NOTE: EOF bytes do not count to size @@ -51,11 +52,15 @@ func (c *DiskCache) rotate() error { last := c.dataFiles[len(c.dataFiles)-1] arr := strings.Split(filepath.Base(last), ".") if len(arr) != 2 { - return ErrInvalidDataFileName + return NewCacheError(OpRotate, ErrInvalidDataFileName, + fmt.Sprintf("invalid_filename_format: %s", last)). + WithPath(c.path).WithFile(last) } x, err := strconv.ParseInt(arr[1], 10, 64) if err != nil { - return ErrInvalidDataFileNameSuffix + return NewCacheError(OpRotate, ErrInvalidDataFileNameSuffix, + fmt.Sprintf("failed_to_parse_sequence_from_filename: %s, error: %v", arr[1], err)). + WithPath(c.path).WithFile(last) } // data.0003 -> data.0004 @@ -64,22 +69,27 @@ func (c *DiskCache) rotate() error { // close current writing file if err := c.wfd.Close(); err != nil { - return fmt.Errorf("rotate on close wfd: %w", err) + return WrapFileOperationError(OpClose, err, c.path, c.wfd.Name()). + WithDetails("failed_to_close_write_file_during_rotate") } c.wfd = nil // rename data -> data.0004 if err := os.Rename(c.curWriteFile, newfile); err != nil { - return fmt.Errorf("rotate on Rename(%q, %q): %w", c.curWriteFile, newfile, err) + return WrapRotateError(err, c.path, c.curWriteFile, newfile). + WithDetails("failed_to_rename_file_during_rotate") } - // new file added, plus it's size to cache size + // new file added, add it's size to cache size if fi, err := os.Stat(newfile); err == nil { if fi.Size() > dataHeaderLen { c.size.Add(fi.Size()) sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) } + } else { + // Non-critical error: log but don't fail rotation + l.Warnf("failed to stat rotated file %s: %v", newfile, err) } c.dataFiles = append(c.dataFiles, newfile) @@ -87,7 +97,8 @@ func (c *DiskCache) rotate() error { // reopen new write file if err := c.openWriteFile(); err != nil { - return err + return NewCacheError(OpRotate, err, "failed_to_open_new_write_file"). + WithPath(c.path) } return nil @@ -105,7 +116,8 @@ func (c *DiskCache) removeCurrentReadingFile() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return err + return WrapFileOperationError(OpClose, err, c.path, c.rfd.Name()). + WithDetails("failed_to_close_read_file_during_removal") } c.rfd = nil } @@ -119,7 +131,8 @@ func (c *DiskCache) removeCurrentReadingFile() error { getBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) if err := os.Remove(c.curReadfile); err != nil { - return fmt.Errorf("removeCurrentReadingFile: %q: %w", c.curReadfile, err) + return WrapFileOperationError(OpRemove, err, c.path, c.curReadfile). + WithDetails("failed_to_remove_consumed_file") } } diff --git a/diskcache/switch.go b/diskcache/switch.go index 3bf12347..b2fcb6cc 100644 --- a/diskcache/switch.go +++ b/diskcache/switch.go @@ -21,7 +21,8 @@ func (c *DiskCache) loadUnfinishedFile() error { pos, err := posFromFile(c.pos.fname) if err != nil { - return fmt.Errorf("posFromFile: %w", err) + return NewCacheError(OpPos, err, "failed_to_load_position_file"). + WithPath(c.path).WithFile(c.pos.fname) } if pos == nil { @@ -31,7 +32,8 @@ func (c *DiskCache) loadUnfinishedFile() error { // check file's healty if _, err := os.Stat(string(pos.Name)); err != nil { // not exist if err := c.pos.reset(); err != nil { - return err + return NewCacheError(OpPos, err, "failed_to_reset_position_after_missing_file"). + WithPath(c.path).WithFile(c.pos.fname) } return nil @@ -44,11 +46,13 @@ func (c *DiskCache) loadUnfinishedFile() error { fd, err := os.OpenFile(string(pos.Name), os.O_RDONLY, c.filePerms) if err != nil { - return fmt.Errorf("OpenFile: %w", err) + return WrapFileOperationError(OpOpen, err, c.path, string(pos.Name)). + WithDetails(fmt.Sprintf("failed_to_open_position_file: seek=%d", pos.Seek)) } if _, err := fd.Seek(pos.Seek, io.SeekStart); err != nil { - return fmt.Errorf("Seek(%q: %d, 0): %w", pos.Name, pos.Seek, err) + return WrapFileOperationError(OpSeek, err, c.path, string(pos.Name)). + WithDetails(fmt.Sprintf("failed_to_seek_to_position: seek=%d", pos.Seek)) } c.rfd = fd @@ -67,7 +71,8 @@ func (c *DiskCache) doSwitchNextFile() error { // clear .pos: prepare for new .pos for next new file. if !c.noPos { if err := c.pos.reset(); err != nil { - return err + return NewCacheError(OpSwitch, err, "failed_to_reset_position_for_switch"). + WithPath(c.path) } } @@ -79,13 +84,15 @@ func (c *DiskCache) doSwitchNextFile() error { fd, err := os.OpenFile(c.curReadfile, os.O_RDONLY, c.filePerms) if err != nil { - return fmt.Errorf("under switchNextFile, OpenFile: %w, datafile: %+#v, ", err, c.dataFiles) + return WrapFileOperationError(OpOpen, err, c.path, c.curReadfile). + WithDetails(fmt.Sprintf("failed_to_open_next_read_file: available_files=%v", c.dataFiles)) } c.rfd = fd if fi, err := c.rfd.Stat(); err != nil { - return fmt.Errorf("on rfd.Stat(): %w", err) + return WrapFileOperationError(OpStat, err, c.path, c.curReadfile). + WithDetails("failed_to_stat_read_file") } else { c.curReadSize = fi.Size() } @@ -93,8 +100,9 @@ func (c *DiskCache) doSwitchNextFile() error { if !c.noPos { c.pos.Name = []byte(c.curReadfile) c.pos.Seek = 0 - if err := c.pos.dumpFile(); err != nil { - return err + if err := c.pos.doDumpFile(); err != nil { + return NewCacheError(OpSwitch, err, "failed_to_dump_position_after_switch"). + WithPath(c.path).WithFile(c.curReadfile) } posUpdatedVec.WithLabelValues("switch", c.path).Inc() @@ -107,7 +115,8 @@ func (c *DiskCache) doSwitchNextFile() error { func (c *DiskCache) openWriteFile() error { if fi, err := os.Stat(c.curWriteFile); err == nil { // file exists if fi.IsDir() { - return errors.New("data file should not be dir") + return NewCacheError(OpCreate, errors.New("data file should not be dir"), ""). + WithPath(c.path).WithFile(c.curWriteFile) } c.curBatchSize = fi.Size() @@ -119,7 +128,8 @@ func (c *DiskCache) openWriteFile() error { // write append fd, always write to the same-name file wfd, err := os.OpenFile(c.curWriteFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, c.filePerms) if err != nil { - return fmt.Errorf("under openWriteFile, OpenFile(%q): %w", c.curWriteFile, err) + return WrapFileOperationError(OpCreate, err, c.path, c.curWriteFile). + WithDetails("failed_to_open_write_file") } c.wfdLastWrite = time.Now()