diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index e27791c13b..b26dff6039 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -81,13 +81,10 @@ func (e *StorageEngine) close(releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() - if releasePools { - for _, p := range e.shardPools { - p.Release() - } - } - for id, sh := range e.shards { + if releasePools { + sh.pool.Release() + } if err := sh.Close(); err != nil { e.log.Debug("could not close shard", zap.String("id", id), @@ -201,8 +198,8 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { } e.mtx.RLock() - for _, pool := range e.shardPools { - pool.Tune(int(e.shardPoolSize)) + for _, sh := range e.shards { + sh.pool.Tune(int(e.shardPoolSize)) } var shardsToRemove []string // shards IDs diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 85c675df14..2fd2c69a5c 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -204,7 +204,6 @@ func TestReload(t *testing.T) { // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) - require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) @@ -218,7 +217,6 @@ func TestReload(t *testing.T) { require.NoError(t, e.Reload(rcfg)) require.Equal(t, shardNum+1, len(e.shards)) - require.Equal(t, shardNum+1, len(e.shardPools)) }) t.Run("remove shards", func(t *testing.T) { @@ -236,7 +234,6 @@ func TestReload(t *testing.T) { // removed one require.Equal(t, shardNum-1, len(e.shards)) - require.Equal(t, shardNum-1, len(e.shardPools)) }) } @@ -264,7 +261,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str } require.Equal(t, num, len(e.shards)) - require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) require.NoError(t, e.Init()) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c9db679e6b..90c4796760 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,8 +30,6 @@ type StorageEngine struct { shards map[string]shardWrapper - shardPools map[string]util.WorkerPool - closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup @@ -55,6 +53,7 @@ type shardInterface interface { type shardWrapper struct { errorCount *atomic.Uint32 + pool util.WorkerPool *shard.Shard shardIface shardInterface // TODO: make Shard a shardInterface } @@ -242,12 +241,11 @@ func New(opts ...Option) *StorageEngine { } return &StorageEngine{ - cfg: c, - mtx: new(sync.RWMutex), - shards: make(map[string]shardWrapper), - shardPools: make(map[string]util.WorkerPool), - closeCh: make(chan struct{}), - setModeCh: make(chan setModeRequest), + cfg: c, + mtx: new(sync.RWMutex), + shards: make(map[string]shardWrapper), + closeCh: make(chan struct{}), + setModeCh: make(chan setModeRequest), sortShardsFn: (*StorageEngine).sortedShards, } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 309ceeadef..932887ff07 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -91,9 +91,9 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { engine.shards[s.ID().String()] = shardWrapper{ errorCount: new(atomic.Uint32), + pool: pool, Shard: s, } - engine.shardPools[s.ID().String()] = pool } return engine diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 491a0ed6ed..a40dc8c9f1 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -3,11 +3,12 @@ package engine import ( "errors" "fmt" + "maps" + "slices" "github.com/nspcc-dev/hrw/v2" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -15,11 +16,6 @@ import ( const defaultEvacuateBatchSize = 100 -type pooledShard struct { - shardWrapper - pool util.WorkerPool -} - var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // Evacuate moves data from a set of given shards to other shards available to @@ -60,13 +56,7 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. - shards := make([]pooledShard, 0, len(e.shards)) - for id := range e.shards { - shards = append(shards, pooledShard{ - shardWrapper: e.shards[id], - pool: e.shardPools[id], - }) - } + var shards = slices.Collect(maps.Values(e.shards)) e.mtx.RUnlock() shardMap := make(map[string]*shard.Shard) @@ -115,7 +105,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - err = e.putToShard(shards[j].shardWrapper, shards[j].pool, addr, obj, nil) + err = e.putToShard(shards[j], addr, obj, nil) if err == nil || errors.Is(err, errExists) { if !errors.Is(err, errExists) { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 0a98d850fd..23d71d0c5c 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -104,7 +104,6 @@ func TestEvacuateShard(t *testing.T) { e.mtx.Lock() delete(e.shards, evacuateShardID) - delete(e.shardPools, evacuateShardID) e.mtx.Unlock() checkHasObjects(t) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 500b821708..0a1fa947a9 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -8,7 +8,6 @@ import ( iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -53,8 +52,8 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. - _, err := e.exists(addr) - if err != nil { + exists, err := e.exists(addr) + if err != nil || exists { return err } @@ -67,7 +66,6 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { } var ( - bestPool util.WorkerPool bestShard shardWrapper overloaded bool shs []shardWrapper @@ -80,19 +78,11 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { } for _, sh := range shs { - e.mtx.RLock() - pool, ok := e.shardPools[sh.ID().String()] - if ok && bestPool == nil { + if bestShard.pool == nil { bestShard = sh - bestPool = pool - } - e.mtx.RUnlock() - if !ok { - // Shard was concurrently removed, skip. - continue } - err = e.putToShard(sh, pool, addr, obj, objBin) + err = e.putToShard(sh, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { return nil } @@ -105,7 +95,7 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { zap.Stringer("addr", addr), zap.Stringer("best shard", bestShard.ID())) if e.objectPutTimeout > 0 { - success, over := e.putToShardWithDeadLine(bestShard, bestPool, addr, obj, objBin) + success, over := e.putToShardWithDeadLine(bestShard, addr, obj, objBin) if success { return nil } @@ -126,7 +116,7 @@ func (e *StorageEngine) Put(obj *object.Object, objBin []byte) error { // putToShard puts object to sh. // Returns error from shard put or errOverloaded (when shard pool can't accept // the task) or errExists (if object is already stored there). -func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr oid.Address, obj *object.Object, objBin []byte) error { +func (e *StorageEngine) putToShard(sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) error { var ( alreadyExists bool err error @@ -135,7 +125,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr o putError error ) - err = pool.Submit(func() { + err = sh.pool.Submit(func() { defer close(exitCh) exists, err := sh.Exists(addr, false) @@ -192,7 +182,7 @@ func (e *StorageEngine) putToShard(sh shardWrapper, pool util.WorkerPool, addr o return putError } -func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.WorkerPool, addr oid.Address, obj *object.Object, objBin []byte) (bool, bool) { +func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, addr oid.Address, obj *object.Object, objBin []byte) (bool, bool) { const putCooldown = 100 * time.Millisecond var ( overloaded bool @@ -206,7 +196,7 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.Worker e.log.Error("could not put object", zap.Stringer("addr", addr), zap.Duration("deadline", e.objectPutTimeout)) return false, overloaded case <-ticker.C: - err := e.putToShard(sh, pool, addr, obj, objBin) + err := e.putToShard(sh, addr, obj, objBin) if errors.Is(err, errOverloaded) { overloaded = true ticker.Reset(putCooldown) @@ -221,8 +211,6 @@ func (e *StorageEngine) putToShardWithDeadLine(sh shardWrapper, pool util.Worker // broadcastObject stores object on ALL shards to ensure it's available everywhere. func (e *StorageEngine) broadcastObject(obj *object.Object, objBin []byte) error { var ( - pool util.WorkerPool - ok bool allShards = e.unsortedShards() addr = obj.Address() goodShards = make([]shardWrapper, 0, len(allShards)) @@ -237,16 +225,7 @@ func (e *StorageEngine) broadcastObject(obj *object.Object, objBin []byte) error zap.Int("shard_count", len(allShards))) for _, sh := range allShards { - e.mtx.RLock() - pool, ok = e.shardPools[sh.ID().String()] - e.mtx.RUnlock() - - if !ok { - // Shard was concurrently removed, skip. - continue - } - - err := e.putToShard(sh, pool, addr, obj, objBin) + err := e.putToShard(sh, addr, obj, objBin) if err == nil || errors.Is(err, errExists) { goodShards = append(goodShards, sh) if errors.Is(err, errExists) { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 5687ad588a..3e72fb76a7 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -3,6 +3,8 @@ package engine import ( "encoding/binary" "fmt" + "maps" + "slices" "sync/atomic" "github.com/google/uuid" @@ -126,11 +128,10 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { e.shards[strID] = shardWrapper{ errorCount: new(atomic.Uint32), + pool: pool, Shard: sh, } - e.shardPools[strID] = pool - return nil } @@ -153,11 +154,7 @@ func (e *StorageEngine) removeShards(ids ...string) { ss = append(ss, sh) delete(e.shards, id) - pool, ok := e.shardPools[id] - if ok { - pool.Release() - delete(e.shardPools, id) - } + sh.pool.Release() e.log.Info("shard has been removed", zap.String("id", id)) @@ -205,13 +202,7 @@ func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]shardWrapper, 0, len(e.shards)) - - for _, sh := range e.shards { - shards = append(shards, sh) - } - - return shards + return slices.Collect(maps.Values(e.shards)) } func (e *StorageEngine) getShard(id string) shardWrapper { diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index ffb027c9b3..d2eb0a9e8b 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -14,7 +14,6 @@ func TestRemoveShard(t *testing.T) { e.Close() }) - require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shards)) removedNum := numOfShards / 2 @@ -34,7 +33,6 @@ func TestRemoveShard(t *testing.T) { } } - require.Equal(t, numOfShards-removedNum, len(e.shardPools)) require.Equal(t, numOfShards-removedNum, len(e.shards)) for id, removed := range mSh {