From 327e2ac46f9972d28f024ecce5ff93224be4a5a3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Feb 2026 12:44:20 +0300 Subject: [PATCH 1/3] engine: drop shardPools, make them a part of shardWrapper Pool is naturally a part of shardWrapper, this patch simplifies locking and overall flow related to pools with no real logic changes. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/control.go | 13 +++---- .../engine/control_test.go | 4 -- pkg/local_object_storage/engine/engine.go | 14 +++---- .../engine/engine_test.go | 2 +- pkg/local_object_storage/engine/evacuate.go | 18 ++------- .../engine/evacuate_test.go | 1 - pkg/local_object_storage/engine/put.go | 37 ++++--------------- pkg/local_object_storage/engine/shards.go | 9 +---- .../engine/shards_test.go | 2 - 9 files changed, 26 insertions(+), 74 deletions(-) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index e27791c13b..b26dff6039 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -81,13 +81,10 @@ func (e *StorageEngine) close(releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() - if releasePools { - for _, p := range e.shardPools { - p.Release() - } - } - for id, sh := range e.shards { + if releasePools { + sh.pool.Release() + } if err := sh.Close(); err != nil { e.log.Debug("could not close shard", zap.String("id", id), @@ -201,8 +198,8 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { } e.mtx.RLock() - for _, pool := range e.shardPools { - pool.Tune(int(e.shardPoolSize)) + for _, sh := range e.shards { + sh.pool.Tune(int(e.shardPoolSize)) } var shardsToRemove []string // shards IDs diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 85c675df14..2fd2c69a5c 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -204,7 +204,6 @@ func TestReload(t *testing.T) { // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) - require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) @@ -218,7 +217,6 @@ func TestReload(t *testing.T) { require.NoError(t, e.Reload(rcfg)) require.Equal(t, shardNum+1, len(e.shards)) - require.Equal(t, shardNum+1, len(e.shardPools)) }) t.Run("remove shards", func(t *testing.T) { @@ -236,7 +234,6 @@ func TestReload(t *testing.T) { // removed one require.Equal(t, shardNum-1, len(e.shards)) - require.Equal(t, shardNum-1, len(e.shardPools)) }) } @@ -264,7 +261,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str } require.Equal(t, num, len(e.shards)) - require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) require.NoError(t, e.Init()) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c9db679e6b..90c4796760 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,8 +30,6 @@ type StorageEngine struct { shards map[string]shardWrapper - shardPools map[string]util.WorkerPool - closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -55,6 +53,7 @@ type shardInterface interface { type shardWrapper struct { errorCount *atomic.Uint32 + pool util.WorkerPool *shard.Shard shardIface shardInterface // TODO: make Shard a shardInterface } @@ -242,12 +241,11 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]shardWrapper), - shardPools: make(map[string]util.WorkerPool), - closeCh: make(chan struct{}), - setModeCh: make(chan setModeRequest), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]shardWrapper), + closeCh: make(chan struct{}), + setModeCh: make(chan setModeRequest), sortShardsFn: (*StorageEngine).sortedShards, } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 309ceeadef..932887ff07 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -91,9 +91,9 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { engine.shards[s.ID().String()] = shardWrapper{ errorCount: new(atomic.Uint32), + pool: pool, Shard: s, } - engine.shardPools[s.ID().String()] = pool } return engine diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 491a0ed6ed..a40dc8c9f1 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -3,11 +3,12 @@ package engine import ( "errors" "fmt" + "maps" + "slices" "github.com/nspcc-dev/hrw/v2" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -15,11 +16,6 @@ import ( const defaultEvacuateBatchSize = 100 -type pooledShard struct { - shardWrapper - pool util.WorkerPool -} - var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from a set of given shards to other shards available to @@ -60,13 +56,7 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. - shards := make([]pooledShard, 0, len(e.shards)) - for id := range e.shards { - shards = append(shards, pooledShard{ - shardWrapper: e.shards[id], - pool: e.shardPools[id], - }) - } + var shards = slices.Collect(maps.Values(e.shards)) e.mtx.RUnlock() shardMap := make(map[string]*shard.Shard) @@ -115,7 +105,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - err = e.putToShard(shards[j].shardWrapper, shards[j].pool, addr, obj, nil) + err = e.putToShard(shards[j], addr, obj, nil) if err == nil || errors.Is(err, errExists) { if !errors.Is(err, errExists) { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 0a98d850fd..23d71d0c5c 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -104,7 +104,6 @@ func TestEvacuateShard(t *testing.T) { e.mtx.Lock() delete(e.shards, evacuateShardID) - delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 500b821708..2d07794021 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -8,7 +8,6 @@ import ( iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -67,7 +66,6 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { } var ( - bestPool util.WorkerPool bestShard shardWrapper overloaded bool shs []shardWrapper @@ -80,19 +78,11 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { } for _, sh := range shs { - e.mtx.RLock() - pool, ok := e.shardPools[sh.ID().String()] - if ok && bestPool == nil { + if bestShard.pool == nil { bestShard = sh - bestPool = pool - } - e.mtx.RUnlock() - if !ok { - // Shard was concurrently removed, skip. - continue } - err = e.putToShard(sh, pool, addr, obj, objBin) + err = e.putToShard(sh, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { return nil } @@ -105,7 +95,7 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) if e.objectPutTimeout > 0 { - success, over := e.putToShardWithDeadLine(bestShard, bestPool, addr, obj, objBin) + success, over := e.putToShardWithDeadLine(bestShard, addr, obj, objBin) if success { return nil } @@ -126,7 +116,7 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { // putToShard puts object to sh. // Returns error from shard put or errOverloaded (when shard pool can't accept // the task) or errExists (if object is already stored there). -func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr oid.Address, obj *object.Object, objBin []byte) error { +func (e *StorageEngine) putToShard(sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) error { var ( alreadyExists bool err error @@ -135,7 +125,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr o putError error ) - err = pool.Submit(func() { + err = sh.pool.Submit(func() { defer close(exitCh) exists, err := sh.Exists(addr, false) @@ -192,7 +182,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr o return putError } -func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.WorkerPool, addr oid.Address, obj *object.Object, objBin []byte) (bool, bool) { +func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) (bool, bool) { const putCooldown = 100 * time.Millisecond var ( overloaded bool @@ -206,7 +196,7 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.Worker e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.objectPutTimeout)) return false, overloaded case <-ticker.C: - err := e.putToShard(sh, pool, addr, obj, objBin) + err := e.putToShard(sh, addr, obj, objBin) if errors.Is(err, errOverloaded) { overloaded = true ticker.Reset(putCooldown) @@ -221,8 +211,6 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.Worker // broadcastObject stores object on ALL shards to ensure it's available everywhere. func (e *StorageEngine) broadcastObject(obj *object.Object, objBin []byte) error { var ( - pool util.WorkerPool - ok bool allShards = e.unsortedShards() addr = obj.Address() goodShards = make([]shardWrapper, 0, len(allShards)) @@ -237,16 +225,7 @@ func (e *StorageEngine) broadcastObject(obj *object.Object, objBin []byte) error zap.Int("shard_count", len(allShards))) for _, sh := range allShards { - e.mtx.RLock() - pool, ok = e.shardPools[sh.ID().String()] - e.mtx.RUnlock() - - if !ok { - // Shard was concurrently removed, skip. - continue - } - - err := e.putToShard(sh, pool, addr, obj, objBin) + err := e.putToShard(sh, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { goodShards = append(goodShards, sh) if errors.Is(err, errExists) { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 5687ad588a..b983695209 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -126,11 +126,10 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { e.shards[strID] = shardWrapper{ errorCount: new(atomic.Uint32), + pool: pool, Shard: sh, } - e.shardPools[strID] = pool - return nil } @@ -153,11 +152,7 @@ func (e *StorageEngine) removeShards(ids ...string) { ss = append(ss, sh) delete(e.shards, id) - pool, ok := e.shardPools[id] - if ok { - pool.Release() - delete(e.shardPools, id) - } + sh.pool.Release() e.log.Info("shard has been removed", zap.String("id", id)) diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index ffb027c9b3..d2eb0a9e8b 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -14,7 +14,6 @@ func TestRemoveShard(t *testing.T) { e.Close() }) - require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shards)) removedNum := numOfShards / 2 @@ -34,7 +33,6 @@ func TestRemoveShard(t *testing.T) { } } - require.Equal(t, numOfShards-removedNum, len(e.shardPools)) require.Equal(t, numOfShards-removedNum, len(e.shards)) for id, removed := range mSh { From eecc2d269ae76fe030cc6c62443f5b9e7773abdd Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Feb 2026 12:49:40 +0300 Subject: [PATCH 2/3] engine: simplify unsortedShards() Pure stylistics. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/shards.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index b983695209..3e72fb76a7 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -3,6 +3,8 @@ package engine import ( "encoding/binary" "fmt" + "maps" + "slices" "sync/atomic" "github.com/google/uuid" @@ -200,13 +202,7 @@ func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]shardWrapper, 0, len(e.shards)) - - for _, sh := range e.shards { - shards = append(shards, sh) - } - - return shards + return slices.Collect(maps.Values(e.shards)) } func (e *StorageEngine) getShard(id string) shardWrapper { From 280c6316da4b75a99ae63a9b5e762fb0c87f78ac Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Feb 2026 12:54:17 +0300 Subject: [PATCH 3/3] engine: use exists() result to shortcut put This check was introduced in 21708d54086aa5960a9b20ce76c7874af9812a2a as a replacement for "already removed" check, but then putToShard() does the same check again and returns errExists if we have an object which then leads to successful put status. Which means that by ignoring the result here we waste a bit more resources to get to the same result. The suggestion is to stop doing that and return ok immediately if we know we have the object. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/put.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 2d07794021..0a1fa947a9 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -52,8 +52,8 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. - _, err := e.exists(addr) - if err != nil { + exists, err := e.exists(addr) + if err != nil || exists { return err }