Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/GuanceCloud/cliutils/logger"
)

const (
Expand Down Expand Up @@ -57,6 +59,9 @@ var (

// Invalid file header.
ErrBadHeader = errors.New("bad header")

l = logger.DefaultSLogger("diskcache")
once sync.Once
)

// DiskCache is the representation of a disk cache.
Expand Down Expand Up @@ -89,7 +94,8 @@ type DiskCache struct {
pos *pos // current read fd position info

// specs of current diskcache
size atomic.Int64 // current byte size
size atomic.Int64 // current byte size

curBatchSize, // current writing file's size
curReadSize, // current reading file's size
batchSize, // current batch size(static)
Expand Down
13 changes: 13 additions & 0 deletions diskcache/envs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package diskcache
import (
"os"
"strconv"
"time"
)

func (c *DiskCache) syncEnv() {
Expand Down Expand Up @@ -37,6 +38,18 @@ func (c *DiskCache) syncEnv() {
c.noPos = true
}

if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_INTERVAL"); ok && v != "" {
if du, err := time.ParseDuration(v); err == nil && du > 0 {
c.pos.dumpInterval = du
}
}

if v, ok := os.LookupEnv("ENV_DISKCACHE_POS_DUMP_AT"); ok && v != "" {
if n, err := strconv.ParseInt(v, 10, 64); err == nil && n > 0 {
c.pos.dumpCount = int(n)
}
}

if v, ok := os.LookupEnv("ENV_DISKCACHE_NO_LOCK"); ok && v != "" {
c.noLock = true
}
Expand Down
31 changes: 23 additions & 8 deletions diskcache/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package diskcache

import (
"encoding/binary"
"errors"
"fmt"
"io"
"time"
Expand All @@ -31,6 +32,8 @@ func (c *DiskCache) skipBadFile() error {
droppedDataVec.WithLabelValues(c.path, reasonBadDataFile).Observe(float64(c.curReadSize))
}()

l.Warnf("skip bad file %s with size %d bytes", c.curReadfile, c.curReadSize)

return c.switchNextFile()
}

Expand Down Expand Up @@ -80,9 +83,10 @@ func (c *DiskCache) doGet(buf []byte, fn Fn, bfn BufFunc) error {
if err = func() error {
c.wlock.Lock()
defer c.wlock.Unlock()

return c.rotate()
}(); err != nil {
return err
return fmt.Errorf("wakeup error: %w", err)
}
}

Expand All @@ -98,6 +102,14 @@ retry:
}

if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen {
if err != nil && !errors.Is(err, io.EOF) {
l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error())
}

if n > 0 && n != dataHeaderLen {
l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen)
}

// On bad datafile, just ignore and delete the file.
if err = c.skipBadFile(); err != nil {
return err
Expand Down Expand Up @@ -130,12 +142,15 @@ retry:

if len(readbuf) < nbytes {
// seek to next read position
if _, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil {
if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil {
return fmt.Errorf("rfd.Seek(%d): %w", nbytes, err)
}
} else {
l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s",
nbytes, len(readbuf), x, nbytes, c.curReadfile)

droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes))
return ErrTooSmallReadBuf
droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes))
return ErrTooSmallReadBuf
}
}

if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil {
Expand Down Expand Up @@ -164,11 +179,11 @@ __updatePos:
// update seek position
if !c.noPos && nbytes > 0 {
c.pos.Seek += int64(dataHeaderLen + nbytes)
if derr := c.pos.dumpFile(); derr != nil {
if do, derr := c.pos.dumpFile(); derr != nil {
return derr
} else if do {
posUpdatedVec.WithLabelValues("get", c.path).Inc()
}

posUpdatedVec.WithLabelValues("get", c.path).Inc()
}

__end:
Expand Down
163 changes: 162 additions & 1 deletion diskcache/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,10 @@ func TestPutGet(t *T.T) {
testData := []byte("0123456789")
ndata := 10

c, err := Open(WithPath(p))
c, err := Open(
WithPath(p),
WithPosUpdate(0, 0),
)

assert.NoError(t, err)

Expand All @@ -487,6 +490,7 @@ func TestPutGet(t *T.T) {

// reopen the cache
c2, err := Open(WithPath(p),
WithPosUpdate(0, 0),
WithCapacity(int64(len(testData)*10)),
WithBatchSize(int64(len(testData)*2)))
require.NoError(t, err, "get error: %s", err)
Expand Down Expand Up @@ -518,3 +522,160 @@ func TestPutGet(t *T.T) {
})
})
}

func TestDelayPosDump(t *T.T) {
t.Run("pos-sync-at", func(t *T.T) {
ResetMetrics()

p := t.TempDir()
c, err := Open(WithPath(p), WithPosUpdate(3, 0))
assert.NoError(t, err)

testData := []byte("0123456789")
ndata := 10

for i := 0; i < ndata; i++ { // write 10 data
require.NoError(t, c.Put(testData), "cache: %s", c)
}

// make data file readable.
require.NoError(t, c.rotate())

// create n read pos
for i := 0; i < 6; i++ {
assert.NoError(t, c.Get(func(data []byte) error {
assert.Len(t, data, len(testData))
return nil
}))
}

assert.Equal(t, 6*int64(len(testData)+dataHeaderLen), c.pos.Seek)
assert.NoError(t, c.Close())

_, err = os.Stat(c.pos.fname)
require.NoError(t, err)

reg := prometheus.NewRegistry()
reg.MustRegister(Metrics()...)
mfs, err := reg.Gather()
require.NoError(t, err)

assert.Equalf(t, float64(2),
metrics.GetMetricOnLabels(mfs,
"diskcache_pos_updated_total",
"get",
c.path,
).GetCounter().GetValue(),
"got metrics\n%s", metrics.MetricFamily2Text(mfs),
)

t.Cleanup(func() {
c.Close()
os.RemoveAll(p)
})
})

t.Run("pos-sync-on-interval", func(t *T.T) {
ResetMetrics()

p := t.TempDir()
c, err := Open(WithPath(p), WithPosUpdate(-1, time.Millisecond*100))
assert.NoError(t, err)

testData := []byte("0123456789")
ndata := 10

for i := 0; i < ndata; i++ { // write 10 data
require.NoError(t, c.Put(testData), "cache: %s", c)
}

// make data file readable.
require.NoError(t, c.rotate())

// create n read pos
for i := 0; i < 3; i++ {
assert.NoError(t, c.Get(func(data []byte) error {
assert.Len(t, data, len(testData))
return nil
}))

time.Sleep(100 * time.Millisecond)
}

assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek)
assert.NoError(t, c.Close())

_, err = os.Stat(c.pos.fname)
require.NoError(t, err)

reg := prometheus.NewRegistry()
reg.MustRegister(Metrics()...)
mfs, err := reg.Gather()
require.NoError(t, err)

assert.Equalf(t, float64(3),
metrics.GetMetricOnLabels(mfs,
"diskcache_pos_updated_total",
"get",
c.path,
).GetCounter().GetValue(),
"got metrics\n%s", metrics.MetricFamily2Text(mfs),
)

t.Cleanup(func() {
c.Close()
os.RemoveAll(p)
})
})

t.Run("pos-sync-force", func(t *T.T) {
ResetMetrics()

p := t.TempDir()
c, err := Open(WithPath(p), WithPosUpdate(0, time.Millisecond*100))
assert.NoError(t, err)

testData := []byte("0123456789")
ndata := 10

for i := 0; i < ndata; i++ { // write 10 data
require.NoError(t, c.Put(testData), "cache: %s", c)
}

// make data file readable.
require.NoError(t, c.rotate())

// create n read pos
for i := 0; i < 3; i++ {
assert.NoError(t, c.Get(func(data []byte) error {
assert.Len(t, data, len(testData))
return nil
}))
}

assert.Equal(t, 3*int64(len(testData)+dataHeaderLen), c.pos.Seek)
assert.NoError(t, c.Close())

_, err = os.Stat(c.pos.fname)
require.NoError(t, err)

reg := prometheus.NewRegistry()
reg.MustRegister(Metrics()...)
mfs, err := reg.Gather()
require.NoError(t, err)

assert.Equalf(t, float64(3),
metrics.GetMetricOnLabels(mfs,
"diskcache_pos_updated_total",
"get",
c.path,
).GetCounter().GetValue(),
"got metrics\n%s", metrics.MetricFamily2Text(mfs),
)

t.Cleanup(func() {
c.Close()
os.RemoveAll(p)
})
})
}
2 changes: 1 addition & 1 deletion diskcache/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func setupMetrics() {
prometheus.GaugeOpts{
Namespace: ns,
Name: "size",
Help: "Current cache size that waiting to be consumed(get)",
Help: "Current cache size that waiting to be consumed(get). The size include header bytes",
},
[]string{"path"},
)
Expand Down
20 changes: 16 additions & 4 deletions diskcache/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ import (
"strconv"
"sync"
"time"

"github.com/GuanceCloud/cliutils/logger"
)

func setupLogger() {
once.Do(func() {
l = logger.SLogger("diskcache")
})
}

// Open init and create a new disk cache. We can set other options with various options.
func Open(opts ...CacheOption) (*DiskCache, error) {
setupLogger()

c := defaultInstance()

// apply extra options
Expand Down Expand Up @@ -64,6 +74,10 @@ func defaultInstance() *DiskCache {
pos: &pos{
Seek: 0,
Name: nil,

// dump position each 100ms or 100 update
dumpInterval: time.Millisecond * 100,
dumpCount: 100,
},
}
}
Expand Down Expand Up @@ -132,10 +146,6 @@ func (c *DiskCache) doOpen() error {
switch filepath.Base(path) {
case ".lock", ".pos": // ignore them
case "data": // not rotated writing file, do not count on sizeVec.
c.size.Add(fi.Size())
// NOTE: c.size not always equal to sizeVec. c.size used to limit
// total bytes used for Put(), but sizeVec used to count size that
// waiting to be Get().
default:
c.size.Add(fi.Size())
sizeVec.WithLabelValues(c.path).Add(float64(fi.Size()))
Expand All @@ -148,6 +158,8 @@ func (c *DiskCache) doOpen() error {
}

sort.Strings(c.dataFiles) // make file-name sorted for FIFO Get()
l.Infof("on open loaded %d files", len(c.dataFiles))
datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles)))

// first get, try load .pos
if !c.noPos {
Expand Down
Loading