From e936c6387516b85727e87bc454c8376e78c67558 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Wed, 7 May 2025 10:53:16 +0300 Subject: [PATCH 1/8] - Batch allocator - No quick ErrNoGameServer --- install/helm/agones/defaultfeaturegates.yaml | 1 + pkg/gameserverallocations/allocator.go | 9 +- pkg/gameserverallocations/allocator_test.go | 224 ++++++++++++++- pkg/gameserverallocations/batch_allocator.go | 266 ++++++++++++++++++ .../metrics_additional.go | 57 ++++ pkg/util/runtime/features.go | 7 +- 6 files changed, 555 insertions(+), 9 deletions(-) create mode 100644 pkg/gameserverallocations/batch_allocator.go create mode 100644 pkg/gameserverallocations/metrics_additional.go diff --git a/install/helm/agones/defaultfeaturegates.yaml b/install/helm/agones/defaultfeaturegates.yaml index b0a8cccd72..454f8c6068 100644 --- a/install/helm/agones/defaultfeaturegates.yaml +++ b/install/helm/agones/defaultfeaturegates.yaml @@ -37,6 +37,7 @@ SidecarContainers: false # Dev features ProcessorAllocator: false +AllocatorBatchesUpdates: false # Example feature Example: false diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index c63923f34d..ae4fa3fa72 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -174,7 +174,11 @@ func (c *Allocator) Run(ctx context.Context) error { } // workers and logic for batching allocations - go c.ListenAndAllocate(ctx, maxBatchQueue) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go c.ListenAndBatchAllocate(ctx, maxBatchQueue) + } else { + go c.ListenAndAllocate(ctx, maxBatchQueue) + } return nil } @@ -693,11 +697,10 @@ func Retry(backoff wait.Backoff, fn func() error) error { } } + // No quick 400s, still do retries for ErrNoGameServer switch { case err == nil: return true, nil - case err == ErrNoGameServer: - return true, err case err == ErrTotalTimeoutExceeded: return true, err default: diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index 99347b94c3..3bdc8ba95a 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "time" @@ -43,11 +44,34 @@ import ( ) func TestAllocatorAllocate(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorAllocateImpl(t, testScenario.features) + }) + } + +} + +func testAllocatorAllocateImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } f, gsList := defaultFixtures(4) a, m := newFakeAllocator() @@ -130,11 +154,34 @@ func TestAllocatorAllocate(t *testing.T) { } func TestAllocatorAllocatePriority(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorAllocatePriorityImpl(t, testScenario.features) + }) + } + +} + +func testAllocatorAllocatePriorityImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } run := func(t *testing.T, name string, test func(t *testing.T, a *Allocator, gas *allocationv1.GameServerAllocation)) { f, gsList := defaultFixtures(4) @@ -515,11 +562,33 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { } func TestAllocatorRunLocalAllocations(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorRunLocalAllocationsImpl(t, testScenario.features) + }) + } +} + +func testAllocatorRunLocalAllocationsImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } t.Run("no problems", func(t *testing.T) { f, gsList := defaultFixtures(5) @@ -568,7 +637,11 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { j3 := request{gsa: gsa.DeepCopy(), response: make(chan response)} a.pendingRequests <- j3 - go a.ListenAndAllocate(ctx, 3) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 3) + } else { + go a.ListenAndAllocate(ctx, 3) + } res1 := <-j1.response assert.NoError(t, res1.err) @@ -591,7 +664,12 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { assert.NotEqual(t, res1.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) assert.NotEqual(t, res2.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) - assert.Equal(t, 3, updateCount) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + // updates are batched into one + assert.Equal(t, 1, updateCount) + } else { + assert.Equal(t, 3, updateCount) + } }) t.Run("no gameservers", func(t *testing.T) { @@ -620,7 +698,11 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { j1 := request{gsa: gsa.DeepCopy(), response: make(chan response)} a.pendingRequests <- j1 - go a.ListenAndAllocate(ctx, 3) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 3) + } else { + go a.ListenAndAllocate(ctx, 3) + } res1 := <-j1.response assert.Nil(t, res1.gs) @@ -630,11 +712,30 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { } func TestAllocatorRunLocalAllocationsCountsAndLists(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true&%s=true", runtime.FeatureAllocatorBatchesUpdates, runtime.FeatureCountsAndLists), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false&%s=true", runtime.FeatureAllocatorBatchesUpdates, runtime.FeatureCountsAndLists), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorRunLocalAllocationsCountsAndLists(t, testScenario.features) + }) + } +} + +func testAllocatorRunLocalAllocationsCountsAndLists(t *testing.T, features string) { t.Parallel() runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() - assert.NoError(t, runtime.ParseFeatures(string(runtime.FeatureCountsAndLists)+"=true")) + assert.NoError(t, runtime.ParseFeatures(features)) a, m := newFakeAllocator() @@ -797,7 +898,11 @@ func TestAllocatorRunLocalAllocationsCountsAndLists(t *testing.T) { j6 := request{gsa: gsaListDistributed.DeepCopy(), response: make(chan response)} a.pendingRequests <- j6 - go a.ListenAndAllocate(ctx, 5) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 5) + } else { + go a.ListenAndAllocate(ctx, 5) + } res1 := <-j1.response assert.NoError(t, res1.err) @@ -1100,3 +1205,112 @@ func newFakeAllocator() (*Allocator, agtesting.Mocks) { return a, m } + +// newFakeAllocator returns a fake allocator. +func newFakeAllocatorWithCustomBatchWaitTime(batchWaitTime time.Duration) (*Allocator, agtesting.Mocks) { + m := agtesting.NewMocks() + + counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory) + a := NewAllocator( + m.AgonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(), + m.KubeInformerFactory.Core().V1().Secrets(), + m.AgonesClient.AgonesV1(), + m.KubeClient, + NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), counter, healthcheck.NewHandler()), + time.Second, + 5*time.Second, + batchWaitTime) + a.recorder = m.FakeRecorder + + return a, m +} + +// 1 gs and 2 gsa in the same batch: one gets the gs, +// the other needs to retry to get the gs +func TestAllocatorAllocateNoQuickNoGameServerError(t *testing.T) { + t.Parallel() + + // TODO: remove when `CountsAndLists` feature flag is moved to stable. + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + require.NoError(t, runtime.ParseFeatures( + fmt.Sprintf("%s=false&%s=true", runtime.FeaturePlayerAllocationFilter, runtime.FeatureCountsAndLists))) + + a, m := newFakeAllocatorWithCustomBatchWaitTime(2 * time.Second) + + // 1 gs, 2 gsa + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateAllocated, + Counters: map[string]agonesv1.CounterStatus{ + "capacity": { // Available Capacity == 999 + Count: 1, + Capacity: 1000, + }}}} + gsList := []agonesv1.GameServer{gs1} + gsLen := len(gsList) + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + ua := action.(k8stesting.UpdateAction) + gs := ua.GetObject().(*agonesv1.GameServer) + + return true, gs, nil + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + require.NoError(t, a.Run(ctx)) + // wait for all the gameservers to be in the cache + require.Eventuallyf(t, func() bool { + return a.allocationCache.cache.Len() == gsLen + }, 10*time.Second, time.Second, fmt.Sprintf("should be %d items in the cache", gsLen)) + + ALLOCATED := agonesv1.GameServerStateAllocated + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Scheduling: apis.Packed, + Selectors: []allocationv1.GameServerSelector{{ + GameServerState: &ALLOCATED, + Counters: map[string]allocationv1.CounterSelector{ + "capacity": { + MinAvailable: 1, + }}}, + }}, + } + + var waitTest sync.WaitGroup + + waitTest.Add(1) + go func() { + defer waitTest.Done() + + // First allocation succeeds + result1, err1 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err1) + require.NotNil(t, result1) + outGsa := result1.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Add(1) + go func() { + defer waitTest.Done() + + // Second allocation in the same batch + result2, err2 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err2) + require.NotNil(t, result2) + outGsa := result2.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Wait() +} diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go new file mode 100644 index 0000000000..6f43c2e6e5 --- /dev/null +++ b/pkg/gameserverallocations/batch_allocator.go @@ -0,0 +1,266 @@ +package gameserverallocations + +import ( + "context" + goErrors "errors" + "time" + + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// batchResponse is an async list of responses for matching requests +type batchResponses struct { + responses []response + counterErrors error + listErrors error +} + +// batchAllocationUpdateWorkers tries to update each newly allocated gs with the last state. If +// the update fails because of a version conflict, all allocations that were applied onto a gs +// will receive an error, thus being available for retries. If the update succeeds, all allocations +// that were applied onto a gs will succeed, and the gs with the updated state will be added +// back to the cache. +func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCount int) chan<- batchResponses { + metrics := c.newMetrics(ctx) + batchUpdateQueue := make(chan batchResponses) + + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case batchRes := <-batchUpdateQueue: + if len(batchRes.responses) > 0 { + // The last response contains the latest gs state + lastGsState := batchRes.responses[len(batchRes.responses)-1].gs + + requestStartTime := time.Now() + + // Try to update with the latest gs state + updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{}) + if updateErr != nil { + metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) + + if !k8serrors.IsConflict(errors.Cause(updateErr)) { + // since we could not allocate, we should put it back + // but not if it's a conflict, as the cache is no longer up to date, and + // we should wait for it to get updated with fresh info. + c.allocationCache.AddGameServer(updatedGs) + } + updateErr = errors.Wrap(updateErr, "error updating allocated gameserver") + } else { + metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) + + // Add the server back as soon as possible and not wait for the informer to update the cache + c.allocationCache.AddGameServer(updatedGs) + + // If successful Update record any Counter or List action errors as a warning + if batchRes.counterErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "CounterActionError", batchRes.counterErrors.Error()) + } + if batchRes.listErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "ListActionError", batchRes.listErrors.Error()) + } + c.recorder.Event(updatedGs, corev1.EventTypeNormal, string(updatedGs.Status.State), "Allocated") + } + + // Forward all responses with their appropriate gs state and update error + for _, res := range batchRes.responses { + res.err = updateErr + res.request.response <- res + } + } + case <-ctx.Done(): + return + } + } + }() + } + + return batchUpdateQueue +} + +// ListenAndBatchAllocate is a blocking function that runs in a loop +// looking at c.pendingRequests for batches of requests that are coming through. +// The difference between this and the original ListenAndAllocate is that this will +// apply the allocation to the local gs (still removing it from the cache) and continue with +// the next allocation from the batch. When the batch is done, the update workers will try to +// update each newly allocated gs with the last state. +func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCount int) { + // setup workers for batch allocation updates + batchUpdateQueue := c.batchAllocationUpdateWorkers(ctx, updateWorkerCount) + + var list []*agonesv1.GameServer + var sortKey uint64 + requestCount := 0 + + metrics := c.newMetrics(ctx) + batchResponsesPerGs := make(map[string]batchResponses) + + flush := func() { + if requestCount > 0 { + metrics.recordAllocationsBatchSize(ctx, requestCount) + } + + for _, batchResponses := range batchResponsesPerGs { + batchUpdateQueue <- batchResponses + } + batchResponsesPerGs = make(map[string]batchResponses) + + list = nil + requestCount = 0 + } + + checkSortKey := func(gsa *allocationv1.GameServerAllocation) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + // SortKey returns the sorting values (list of Priorities) as a determinstic key. + // In case gsa.Spec.Priorities is nil this will still return a sortKey. + // In case of error this will return 0 for the sortKey. + newSortKey, err := gsa.SortKey() + if err != nil { + c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err) + } + // Set sortKey if this is the first request, or the previous request errored on creating a sortKey. + if sortKey == uint64(0) { + sortKey = newSortKey + } + + if newSortKey != sortKey { + sortKey = newSortKey + flush() + } + } + } + + checkRefreshList := func(gsa *allocationv1.GameServerAllocation) { + // refresh the list after every 100 allocations made in a single batch + if requestCount >= maxBatchBeforeRefresh { + flush() + } + requestCount++ + + checkSortKey(gsa) + + // Sort list if necessary + if list == nil { + if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) || gsa.Spec.Scheduling == apis.Packed { + list = c.allocationCache.ListSortedGameServers(gsa) + } else { + // If FeatureCountsAndLists and Scheduling == Distributed, sort game servers by Priorities + list = c.allocationCache.ListSortedGameServersPriorities(gsa) + } + } + } + + for { + select { + case req := <-c.pendingRequests: + checkRefreshList(req.gsa) + + gs, index, err := findGameServerForAllocation(req.gsa, list) + if err != nil { + req.response <- response{request: req, gs: nil, err: err} + continue + } + + // if the gs has not been already allocated in this batch, remove it from the cache, + // but keep it in the list for the next allocation + existingBatch, alreadyAllocated := batchResponsesPerGs[string(gs.UID)] + if !alreadyAllocated { + if removeErr := c.allocationCache.RemoveGameServer(gs); removeErr != nil { + // this seems unlikely, but lets handle it just in case + removeErr = errors.Wrap(removeErr, "error removing gameserver from cache") + req.response <- response{request: req, gs: nil, err: removeErr} + + // remove the game server because it is problematic + list = append(list[:index], list[index+1:]...) + continue + } + } + + // apply the allocation to the gs in the list (not in cache anymore) + applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gs, req.gsa) + if applyError == nil { + if alreadyAllocated { + existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gs.DeepCopy(), err: nil}) + existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors) + existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors) + batchResponsesPerGs[string(gs.UID)] = existingBatch + } else { // first time we see this gs in this batch + batchResponsesPerGs[string(gs.UID)] = batchResponses{ + responses: []response{{request: req, gs: gs.DeepCopy(), err: nil}}, + counterErrors: counterErrors, + listErrors: listErrors, + } + } + } else { + req.response <- response{request: req, gs: nil, err: applyError} + } + case <-ctx.Done(): + flush() + return + default: + flush() + + // slow down cpu churn, and allow items to batch + time.Sleep(c.batchWaitTime) + } + } +} + +// applyAllocationToLocalGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State. +// Returns the encountered errors. +func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) { + // add last allocated, so it always gets updated, even if it is already Allocated + ts, err := time.Now().MarshalText() + if err != nil { + return err, nil, nil + } + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, 1) + } + gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts) + gs.Status.State = agonesv1.GameServerStateAllocated + + // patch ObjectMeta labels + if mp.Labels != nil { + if gs.ObjectMeta.Labels == nil { + gs.ObjectMeta.Labels = make(map[string]string, len(mp.Labels)) + } + for key, value := range mp.Labels { + gs.ObjectMeta.Labels[key] = value + } + } + + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations)) + } + // apply annotations patch + for key, value := range mp.Annotations { + gs.ObjectMeta.Annotations[key] = value + } + + // perfom any Counter or List actions + var counterErrors error + var listErrors error + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + if gsa.Spec.Counters != nil { + for counter, ca := range gsa.Spec.Counters { + counterErrors = goErrors.Join(counterErrors, ca.CounterActions(counter, gs)) + } + } + if gsa.Spec.Lists != nil { + for list, la := range gsa.Spec.Lists { + listErrors = goErrors.Join(listErrors, la.ListActions(list, gs)) + } + } + } + + return nil, counterErrors, listErrors +} diff --git a/pkg/gameserverallocations/metrics_additional.go b/pkg/gameserverallocations/metrics_additional.go new file mode 100644 index 0000000000..bd0a2b8494 --- /dev/null +++ b/pkg/gameserverallocations/metrics_additional.go @@ -0,0 +1,57 @@ +package gameserverallocations + +import ( + "context" + "time" + + mt "agones.dev/agones/pkg/metrics" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + gameServerAllocationsUpdatesLatency = stats.Float64("gameserver_allocations/update_latency", "The duration of gameserver updates", "s") + gameServerAllocationsBatchSize = stats.Int64("gameserver_allocations/batch", "The gameserver allocations batch size", "1") +) + +func init() { + + stateViews := []*view.View{ + { + Name: "gameserver_allocations_updates_duration_seconds", + Measure: gameServerAllocationsUpdatesLatency, + Description: "The distribution of gameserver allocation update requests latencies.", + Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, + { + Name: "gameserver_allocations_batch_size", + Measure: gameServerAllocationsBatchSize, + Description: "The count of gameserver allocations in a batch", + Aggregation: view.Distribution(1, 2, 3, 4, 5, 10, 20, 50, 100), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, + } + + for _, v := range stateViews { + if err := view.Register(v); err != nil { + logger.WithError(err).Error("could not register view") + } + } +} + +// record the current allocation batch size rate. +func (r *metrics) recordAllocationsBatchSize(ctx context.Context, count int) { + stats.Record(ctx, gameServerAllocationsBatchSize.M(int64(count))) +} + +func (r *metrics) recordAllocationUpdateSuccess(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} + +func (r *metrics) recordAllocationUpdateFailure(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Failure")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} diff --git a/pkg/util/runtime/features.go b/pkg/util/runtime/features.go index ae94474824..a34e6320b7 100644 --- a/pkg/util/runtime/features.go +++ b/pkg/util/runtime/features.go @@ -79,9 +79,13 @@ const ( //////////////// // Dev features + // FeatureProcessorAllocator is a feature flag to enable/disable the processor allocator feature. FeatureProcessorAllocator = "ProcessorAllocator" + // FeatureAllocatorBatchesUpdates is a feature flag to enable/disable applying the allocations in batches. + FeatureAllocatorBatchesUpdates = "AllocatorBatchesUpdates" + //////////////// // Example feature @@ -165,7 +169,8 @@ var ( FeatureSidecarContainers: false, // Dev features - FeatureProcessorAllocator: false, + FeatureProcessorAllocator: false, + FeatureAllocatorBatchesUpdates: false, // Example feature FeatureExample: false, From 7c27268e19a40fbef3196f896ccf4ddd15c59a65 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Wed, 7 May 2025 11:16:03 +0000 Subject: [PATCH 2/8] Fix lint --- pkg/gameserverallocations/batch_allocator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index 6f43c2e6e5..dd8cad4c66 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -17,9 +17,9 @@ import ( // batchResponse is an async list of responses for matching requests type batchResponses struct { - responses []response counterErrors error listErrors error + responses []response } // batchAllocationUpdateWorkers tries to update each newly allocated gs with the last state. If From 9be7dcc0b575d7103226f45f8a1e602391bc52ec Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Fri, 16 May 2025 09:46:35 +0300 Subject: [PATCH 3/8] Made the feature alpha --- build/Makefile | 2 +- cloudbuild.yaml | 2 +- install/helm/agones/defaultfeaturegates.yaml | 1 + pkg/util/runtime/features.go | 9 ++++----- site/content/en/docs/Guides/feature-stages.md | 1 + test/upgrade/versionMap.yaml | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/build/Makefile b/build/Makefile index 79f2ad6599..9c82314fd4 100644 --- a/build/Makefile +++ b/build/Makefile @@ -73,7 +73,7 @@ BETA_FEATURE_GATES ?= "AutopilotPassthroughPort=true&CountsAndLists=true&GKEAuto # Enable all alpha feature gates. Keep in sync with `false` (alpha) entries in pkg/util/runtime/features.go:featureDefaults -ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&SidecarContainers=true&Example=true" +ALPHA_FEATURE_GATES ?= "AllocatorBatchesUpdates=true&PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&SidecarContainers=true&Example=true" # Build with Windows support WITH_WINDOWS=1 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 03947ed858..067294af68 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -306,7 +306,7 @@ steps: # Keep in sync with the inverse of 'alpha' and 'beta' features in # pkg/util/runtime/features.go:featureDefaults - featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&AutopilotPassthroughPort=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=true&Example=true" + featureWithGate="AllocatorBatchesUpdates=true&PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&AutopilotPassthroughPort=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=true&Example=true" featureWithoutGate="" # Use this if specific feature gates can only be supported on specific Kubernetes versions. diff --git a/install/helm/agones/defaultfeaturegates.yaml b/install/helm/agones/defaultfeaturegates.yaml index 454f8c6068..8423ca42ce 100644 --- a/install/helm/agones/defaultfeaturegates.yaml +++ b/install/helm/agones/defaultfeaturegates.yaml @@ -34,6 +34,7 @@ FleetAutoscaleRequestMetaData: false PlayerAllocationFilter: false PlayerTracking: false SidecarContainers: false +AllocatorBatchesUpdates: false # Dev features ProcessorAllocator: false diff --git a/pkg/util/runtime/features.go b/pkg/util/runtime/features.go index a34e6320b7..56e969b560 100644 --- a/pkg/util/runtime/features.go +++ b/pkg/util/runtime/features.go @@ -76,16 +76,15 @@ const ( // FeatureSidecarContainers is a feature flag to enable/disable k8s sidecar containers for the sdkserver FeatureSidecarContainers = "SidecarContainers" + // FeatureAllocatorBatchesUpdates is a feature flag to enable/disable applying the allocations in batches. + FeatureAllocatorBatchesUpdates = "AllocatorBatchesUpdates" + //////////////// // Dev features - // FeatureProcessorAllocator is a feature flag to enable/disable the processor allocator feature. FeatureProcessorAllocator = "ProcessorAllocator" - // FeatureAllocatorBatchesUpdates is a feature flag to enable/disable applying the allocations in batches. - FeatureAllocatorBatchesUpdates = "AllocatorBatchesUpdates" - //////////////// // Example feature @@ -167,10 +166,10 @@ var ( FeaturePlayerAllocationFilter: false, FeaturePlayerTracking: false, FeatureSidecarContainers: false, + FeatureAllocatorBatchesUpdates: false, // Dev features FeatureProcessorAllocator: false, - FeatureAllocatorBatchesUpdates: false, // Example feature FeatureExample: false, diff --git a/site/content/en/docs/Guides/feature-stages.md b/site/content/en/docs/Guides/feature-stages.md index fa68949240..c5e206a94c 100644 --- a/site/content/en/docs/Guides/feature-stages.md +++ b/site/content/en/docs/Guides/feature-stages.md @@ -42,6 +42,7 @@ The current set of `alpha` and `beta` feature gates: | [Scheduled Fleet Autoscaling](https://github.com/googleforgames/agones/issues/3008) | `ScheduledAutoscaler` | Enabled | `Beta` | 1.51.0 | | [Extend Webhook autoscaler to send fleet metadata with the request](https://github.com/googleforgames/agones/issues/3951) | `FleetAutoscaleRequestMetaData` | Disabled | `Alpha` | 1.48.0 | | [Sidecar Containers](https://github.com/googleforgames/agones/issues/3642) | `SidecarContainers` | Disabled | `Alpha` | 1.49.0 | +| [Allocator Batches Updates](https://github.com/googleforgames/agones/issues/3992) | `AllocatorBatchesUpdates` | Disabled | `Alpha` | 1.50.0 | | Example Gate (not in use) | `Example` | Disabled | None | 0.13.0 | [fleet-updates]: {{% relref "./fleet-updates.md#notifying-gameservers-on-fleet-updatedownscale" %}} diff --git a/test/upgrade/versionMap.yaml b/test/upgrade/versionMap.yaml index 2ddf5b0830..1fcf965aeb 100644 --- a/test/upgrade/versionMap.yaml +++ b/test/upgrade/versionMap.yaml @@ -97,7 +97,7 @@ data: "betaGates": ["AutopilotPassthroughPort", "CountsAndLists", "GKEAutopilotExtendedDurationPods", "PortPolicyNone", "PortRanges", "RollingUpdateFix", "ScheduledAutoscaler"] }, "Dev": { - "alphaGates": ["FleetAutoscaleRequestMetaData", "PlayerAllocationFilter", "PlayerTracking", "SidecarContainers"], + "alphaGates": ["AllocatorBatchesUpdates", "FleetAutoscaleRequestMetaData", "PlayerAllocationFilter", "PlayerTracking", "SidecarContainers"], "betaGates": ["AutopilotPassthroughPort", "CountsAndLists", "GKEAutopilotExtendedDurationPods", "PortPolicyNone", "PortRanges", "RollingUpdateFix", "ScheduledAutoscaler"] } } From ce311d684d16a3c0835934d56eb15ab3baca4b1d Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Fri, 16 May 2025 10:51:38 +0300 Subject: [PATCH 4/8] Updating comments --- pkg/gameserverallocations/allocator_test.go | 2 +- pkg/gameserverallocations/batch_allocator.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index 3bdc8ba95a..e0355cb5cb 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -1206,7 +1206,7 @@ func newFakeAllocator() (*Allocator, agtesting.Mocks) { return a, m } -// newFakeAllocator returns a fake allocator. +// newFakeAllocatorWithCustomBatchWaitTime returns a fake allocator with a batchWaitTime func newFakeAllocatorWithCustomBatchWaitTime(batchWaitTime time.Duration) (*Allocator, agtesting.Mocks) { m := agtesting.NewMocks() diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index dd8cad4c66..8733efeedc 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -208,7 +208,11 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun default: flush() - // slow down cpu churn, and allow items to batch + // If nothing is found in c.pendingRequests, we move to + // default: which will wait for c.batchWaitTime, to allow for some requests to backup in c.pendingRequests, + // providing us with a batch of Allocation requests in that channel + + // Once we have 1 or more requests in c.pendingRequests (which is buffered to 100), we can start the batch process. time.Sleep(c.batchWaitTime) } } From f77abeafc3bebb87e5345aafcec56b9611c2b8ef Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Fri, 16 May 2025 12:20:03 +0300 Subject: [PATCH 5/8] Make the new metrics work with the regular allocator too --- pkg/gameserverallocations/allocator.go | 25 ++++++-- pkg/gameserverallocations/metrics.go | 37 +++++++++++- .../metrics_additional.go | 57 ------------------- site/content/en/docs/Guides/metrics.md | 6 +- 4 files changed, 58 insertions(+), 67 deletions(-) delete mode 100644 pkg/gameserverallocations/metrics_additional.go diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index ae4fa3fa72..bcdc21f6c1 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -515,14 +515,23 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int var list []*agonesv1.GameServer var sortKey uint64 requestCount := 0 + metrics := c.newMetrics(ctx) + + flush := func() { + if requestCount > 0 { + metrics.recordAllocationsBatchSize(ctx, requestCount) + } + + list = nil + requestCount = 0 + } for { select { case req := <-c.pendingRequests: // refresh the list after every 100 allocations made in a single batch if requestCount >= maxBatchBeforeRefresh { - list = nil - requestCount = 0 + flush() } if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { @@ -540,8 +549,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int if newSortKey != sortKey { sortKey = newSortKey - list = nil - requestCount = 0 + flush() } } @@ -575,8 +583,8 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int case <-ctx.Done(): return default: - list = nil - requestCount = 0 + flush() + // slow down cpu churn, and allow items to batch time.Sleep(c.batchWaitTime) } @@ -667,10 +675,15 @@ func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocati } } + metrics := c.newMetrics(ctx) + requestStartTime := time.Now() + gsUpdate, updateErr := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{}) if updateErr != nil { + metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) return gsUpdate, updateErr } + metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) // If successful Update record any Counter or List action errors as a warning if counterErrors != nil { diff --git a/pkg/gameserverallocations/metrics.go b/pkg/gameserverallocations/metrics.go index e5d98b9b8b..36eaeb149b 100644 --- a/pkg/gameserverallocations/metrics.go +++ b/pkg/gameserverallocations/metrics.go @@ -40,8 +40,10 @@ var ( keyStatus = mt.MustTagKey("status") keySchedulingStrategy = mt.MustTagKey("scheduling_strategy") - gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") - gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") + gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsUpdatesLatency = stats.Float64("gameserver_allocations/update_latency", "The duration of gameserver updates", "s") + gameServerAllocationsBatchSize = stats.Int64("gameserver_allocations/batch", "The gameserver allocations batch size", "1") stateViews = []*view.View{ { @@ -58,6 +60,20 @@ var ( Aggregation: view.Distribution(1, 2, 3, 4, 5), TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, }, + { + Name: "gameserver_allocations_updates_duration_seconds", + Measure: gameServerAllocationsUpdatesLatency, + Description: "The distribution of gameserver allocation update requests latencies.", + Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, + { + Name: "gameserver_allocations_batch_size", + Measure: gameServerAllocationsBatchSize, + Description: "The count of gameserver allocations in a batch", + Aggregation: view.Distribution(1, 2, 3, 4, 5, 10, 20, 50, 100), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, } ) @@ -155,3 +171,20 @@ func (r *metrics) recordAllocationRetrySuccess(ctx context.Context, retryCount i mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, gameServerAllocationsRetryTotal.M(int64(retryCount))) } + +// record the current allocation batch size. +func (r *metrics) recordAllocationsBatchSize(ctx context.Context, count int) { + stats.Record(ctx, gameServerAllocationsBatchSize.M(int64(count))) +} + +// record the gs successful update latency. +func (r *metrics) recordAllocationUpdateSuccess(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} + +// record the gs failed update latency. +func (r *metrics) recordAllocationUpdateFailure(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Failure")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} diff --git a/pkg/gameserverallocations/metrics_additional.go b/pkg/gameserverallocations/metrics_additional.go deleted file mode 100644 index bd0a2b8494..0000000000 --- a/pkg/gameserverallocations/metrics_additional.go +++ /dev/null @@ -1,57 +0,0 @@ -package gameserverallocations - -import ( - "context" - "time" - - mt "agones.dev/agones/pkg/metrics" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" -) - -var ( - gameServerAllocationsUpdatesLatency = stats.Float64("gameserver_allocations/update_latency", "The duration of gameserver updates", "s") - gameServerAllocationsBatchSize = stats.Int64("gameserver_allocations/batch", "The gameserver allocations batch size", "1") -) - -func init() { - - stateViews := []*view.View{ - { - Name: "gameserver_allocations_updates_duration_seconds", - Measure: gameServerAllocationsUpdatesLatency, - Description: "The distribution of gameserver allocation update requests latencies.", - Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), - TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, - }, - { - Name: "gameserver_allocations_batch_size", - Measure: gameServerAllocationsBatchSize, - Description: "The count of gameserver allocations in a batch", - Aggregation: view.Distribution(1, 2, 3, 4, 5, 10, 20, 50, 100), - TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, - }, - } - - for _, v := range stateViews { - if err := view.Register(v); err != nil { - logger.WithError(err).Error("could not register view") - } - } -} - -// record the current allocation batch size rate. -func (r *metrics) recordAllocationsBatchSize(ctx context.Context, count int) { - stats.Record(ctx, gameServerAllocationsBatchSize.M(int64(count))) -} - -func (r *metrics) recordAllocationUpdateSuccess(ctx context.Context, duration time.Duration) { - mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, - gameServerAllocationsUpdatesLatency.M(duration.Seconds())) -} - -func (r *metrics) recordAllocationUpdateFailure(ctx context.Context, duration time.Duration) { - mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Failure")}, - gameServerAllocationsUpdatesLatency.M(duration.Seconds())) -} diff --git a/site/content/en/docs/Guides/metrics.md b/site/content/en/docs/Guides/metrics.md index c2564b5f67..841f7ad6e0 100644 --- a/site/content/en/docs/Guides/metrics.md +++ b/site/content/en/docs/Guides/metrics.md @@ -49,8 +49,10 @@ Follow the [Google Cloud Monitoring installation steps](#google-cloud-monitoring |-------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------| | agones_gameservers_count | The number of gameservers per fleet and status | gauge | | agones_gameserver_allocations_duration_seconds | The distribution of gameserver allocation requests latencies | histogram | -| agones_gameserver_allocations_retry_total | The count of gameserver allocation retry until it succeeds | histogram | -| agones_gameserver_creation_duration | The time gameserver takes to be created in seconds | histogram | +| agones_gameserver_allocations_retry_total | The count of gameserver allocation retry until it succeeds | histogram | +| agones_gameserver_allocations_updates_duration_seconds| The distribution of gameserver allocation update requests latencies | histogram | +| agones_gameserver_allocations_batch_size | The count of gameserver allocations in a batch | histogram | +| agones_gameserver_creation_duration | The time gameserver takes to be created in seconds | histogram | | agones_gameservers_total | The total of gameservers per fleet and status | counter | | agones_gameserver_player_connected_total | The total number of players connected to gameservers (Only available when [player tracking]({{< relref "player-tracking.md" >}}) is enabled) | gauge | | agones_gameserver_player_capacity_total | The available capacity for players on gameservers (Only available when [player tracking]({{< relref "player-tracking.md" >}}) is enabled) | gauge | From 363b8bd46b5bcd994a90c0d2325c579a06704289 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Tue, 5 Aug 2025 00:21:18 +0300 Subject: [PATCH 6/8] Reorder game server after applying allocation inside the batch allocator --- pkg/gameserverallocations/allocation_cache.go | 269 +++++++++++++---- .../allocation_cache_test.go | 280 ++++++++++++++++++ pkg/gameserverallocations/batch_allocator.go | 37 ++- 3 files changed, 514 insertions(+), 72 deletions(-) diff --git a/pkg/gameserverallocations/allocation_cache.go b/pkg/gameserverallocations/allocation_cache.go index 75c53afd80..8ce7f70c59 100644 --- a/pkg/gameserverallocations/allocation_cache.go +++ b/pkg/gameserverallocations/allocation_cache.go @@ -18,6 +18,7 @@ import ( "context" "sort" + "agones.dev/agones/pkg/apis" "agones.dev/agones/pkg/apis/agones" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" @@ -174,64 +175,17 @@ func (c *AllocationCache) ListSortedGameServers(gsa *allocationv1.GameServerAllo counts := c.counter.Counts() sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] + gs0 := list[i] + gs1 := list[j] - // Search Allocated GameServers first. - if gs1.Status.State != gs2.Status.State { - return gs1.Status.State == agonesv1.GameServerStateAllocated - } - - c1, ok := counts[gs1.Status.NodeName] - if !ok { - return false - } - - c2, ok := counts[gs2.Status.NodeName] - if !ok { - return true - } - - if c1.Allocated > c2.Allocated { - return true - } - if c1.Allocated < c2.Allocated { - return false - } - - // prefer nodes that have the most Ready gameservers on them - they are most likely to be - // completely filled and least likely target for scale down. - if c1.Ready < c2.Ready { - return false - } - if c1.Ready > c2.Ready { - return true - } - - // if player tracking is enabled, prefer game servers with the least amount of room left - if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { - if gs1.Status.Players != nil && gs2.Status.Players != nil { - cap1 := gs1.Status.Players.Capacity - gs1.Status.Players.Count - cap2 := gs2.Status.Players.Capacity - gs2.Status.Players.Count - - // if they are equal, pass the comparison through. - if cap1 < cap2 { - return true - } else if cap2 < cap1 { - return false - } - } - } - - // if we end up here, then break the tie with Counter or List Priority. + var priorities []agonesv1.Priority if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } + priorities = gsa.Spec.Priorities + } else { + priorities = nil } - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForPakcedStrategy(gs0, gs1, priorities, counts) }) return list @@ -246,17 +200,17 @@ func (c *AllocationCache) ListSortedGameServersPriorities(gsa *allocationv1.Game } sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] + gs0 := list[i] + gs1 := list[j] + var priorities []agonesv1.Priority if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } + priorities = gsa.Spec.Priorities + } else { + priorities = nil } - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForDistributedStrategy(gs0, gs1, priorities) }) return list @@ -327,3 +281,196 @@ func (c *AllocationCache) getKey(gs *agonesv1.GameServer) (string, bool) { } return key, ok } + +// ReorderGameServerAfterAllocation positions the new gsAfterAllocation in the gsList according to the given priorities +// and using the gsIndexBeforeAllocation as hint to optimize the reordering. This is used by the batch allocator +// to reorder the gs after locally (not in cache) applying an allocation. +func (c *AllocationCache) ReorderGameServerAfterAllocation( + gsList []*agonesv1.GameServer, + gsIndexBeforeAllocation int, gsAfterAllocation *agonesv1.GameServer, + priorities []agonesv1.Priority, strategy apis.SchedulingStrategy) { + if len(gsList) == 0 || gsIndexBeforeAllocation < 0 || gsIndexBeforeAllocation >= len(gsList) || gsAfterAllocation == nil { + c.baseLogger.WithField("gsIndexBeforeAllocation", gsIndexBeforeAllocation). + WithField("gsAfterAllocation", gsAfterAllocation). + WithField("gsListLength", len(gsList)). + Warn("ReorderGameServerAfterAllocation called with invalid parameters! Reordering is skipped!") + return + } + + newIndex := gsIndexBeforeAllocation + gsToReorderOriginal := gsList[gsIndexBeforeAllocation] + + optimizeList := func(greater bool) []*agonesv1.GameServer { + var optimizedGsList []*agonesv1.GameServer + if greater { + // If the gs has less priority than the original, we need to insert it at the end of the list + optimizedGsList = gsList[gsIndexBeforeAllocation+1:] + } else { + // Otherwise, we need to insert it at the beginning of the list + optimizedGsList = gsList[:gsIndexBeforeAllocation] + } + return optimizedGsList + } + + switch strategy { + case apis.Packed: + counts := c.counter.Counts() + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities, counts) + if !equal { + newIndex = findIndexAfterAllocationForPackedStrategy(optimizeList(greater), gsAfterAllocation, priorities, counts) + if greater { + newIndex += gsIndexBeforeAllocation + } + } + case apis.Distributed: + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities) + if !equal { + newIndex = findIndexAfterAllocationForDistributedStrategy(optimizeList(greater), gsAfterAllocation, priorities) + if greater { + newIndex += gsIndexBeforeAllocation + } + } else { + c.baseLogger.WithField("startegy", strategy). + Warn("Scheduling strategy not supported! Reordering is skipped!") + } + } + + if newIndex != gsIndexBeforeAllocation { + // If the new index is different than the original index, we need to: + // remove the original + gsList = append(gsList[:gsIndexBeforeAllocation], gsList[gsIndexBeforeAllocation+1:]...) + // and insert the updated one + gsList = append(gsList[:newIndex], append([]*agonesv1.GameServer{gsAfterAllocation}, gsList[newIndex:]...)...) + } else { + // No reordering needed, just update the gs in the list + gsList[gsIndexBeforeAllocation] = gsAfterAllocation + } +} + +// compareGameServersAfterAllocationForDistributedStrategy compares the priority of the before and after applying an allocation to a game server. +// The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. +// It does not take into account the name of the game server, so it can return an equal result. +// Used for the distributed startegy. +func compareGameServersAfterAllocationForDistributedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority) (bool, bool) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + + // gs priority remains the same after allocation + return false, true +} + +// compareGameServersAfterAllocationForPackedStrategy compares the priority of the before and after applying an allocation to a game server. +// The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. +// It does not take into account the name of the game server, so it can return an equal result. +// Used for the packed startegy. +func compareGameServersAfterAllocationForPackedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority, + counts map[string]gameservers.NodeCount) (bool, bool) { + // Search Allocated GameServers first. + if before.Status.State != after.Status.State { + return before.Status.State == agonesv1.GameServerStateAllocated, false + } + + c1, ok := counts[before.Status.NodeName] + if !ok { + return false, false + } + + c2, ok := counts[after.Status.NodeName] + if !ok { + return true, false + } + + if c1.Allocated > c2.Allocated { + return true, false + } + if c1.Allocated < c2.Allocated { + return false, false + } + + // prefer nodes that have the most Ready gameservers on them - they are most likely to be + // completely filled and least likely target for scale down. + if c1.Ready < c2.Ready { + return false, false + } + if c1.Ready > c2.Ready { + return true, false + } + + // if player tracking is enabled, prefer game servers with the least amount of room left + if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { + if before.Status.Players != nil && after.Status.Players != nil { + cap1 := before.Status.Players.Capacity - before.Status.Players.Count + cap2 := after.Status.Players.Capacity - after.Status.Players.Count + + // if they are equal, pass the comparison through. + if cap1 < cap2 { + return true, false + } else if cap2 < cap1 { + return false, false + } + } + } + + // if we end up here, then break the tie with Counter or List Priority. + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + + // gs priority remains the same after allocation + return false, true +} + +// compareGameServersForPakcedStrategy compares the priority of two game servers based on the given priorities and node counts. +// The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the +// less comparison as higher priority gs are positioned to the beginning of the list). +// Used for the packed startegy. +func compareGameServersForPakcedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) bool { + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gs0, gs1, priorities, counts) + if !equal { + return greater + } + + // finally sort lexicographically, so we have a stable order + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// compareGameServers compares the priority of two game servers based on the given priorities. +// The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the +// less comparison as higher priority gs are positioned to the beginning of the list). +// Used for the distributed startegy. +func compareGameServersForDistributedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority) bool { + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gs0, gs1, priorities) + if !equal { + return greater + } + + // finally sort lexicographically, so we have a stable order + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// findIndexAfterAllocationForPackedStrategy finds the index where the gs should be inserted to maintain the list sorted. +// Used for the packed startegy. +func findIndexAfterAllocationForPackedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) int { + pos := sort.Search(len(gsList), func(i int) bool { + return compareGameServersForPakcedStrategy(gs, gsList[i], priorities, counts) + }) + return pos +} + +// findIndexAfterAllocationForDistributedStrategy finds the index where the gs should be inserted to maintain the list sorted. +// Used for the distributed startegy. +func findIndexAfterAllocationForDistributedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority) int { + pos := sort.Search(len(gsList), func(i int) bool { + return compareGameServersForDistributedStrategy(gs, gsList[i], priorities) + }) + return pos +} diff --git a/pkg/gameserverallocations/allocation_cache_test.go b/pkg/gameserverallocations/allocation_cache_test.go index 5904f12e2b..35890d019c 100644 --- a/pkg/gameserverallocations/allocation_cache_test.go +++ b/pkg/gameserverallocations/allocation_cache_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "agones.dev/agones/pkg/apis" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/gameservers" @@ -607,3 +608,282 @@ func newFakeAllocationCache() (*AllocationCache, agtesting.Mocks) { cache := NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory), healthcheck.NewHandler()) return cache, m } + +func TestAllocationCacheReorderGameServerAfterAllocation(t *testing.T) { + t.Parallel() + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + + //gs0 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, + // Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs0Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs1Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs2 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs2Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs3 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, UID: "3"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateReady}} + gs4 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, UID: "4"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 3, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 3, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2"}, + Capacity: 10, + }, + }, + }, + } + gs5 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 2, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 2, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1"}, + Capacity: 10, + }, + }, + }, + } + gs5Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 5, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 5, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2", "player3", "player4"}, + Capacity: 10, + }, + }, + }, + } + gs6 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, UID: "6"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 1, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 1, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0"}, + Capacity: 10, + }, + }, + }, + } + + fixtures := map[string]struct { + features string + list []*agonesv1.GameServer + priorities []agonesv1.Priority + packingStrategy apis.SchedulingStrategy + gsToReorder *agonesv1.GameServer + gsToReorderIndex int + want []*agonesv1.GameServer + }{ + "pakced (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2, &gs3}, + }, + "packed": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + }, + "packed (sort by name)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2Allocated, &gs3}, + }, + "packed (all ready)": { + list: []*agonesv1.GameServer{&gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated, &gs2, &gs3}, + }, + "packed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "packed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "packed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + "packed (FeaturePlayerAllocationFilter)": { + features: fmt.Sprintf("%s=true", runtime.FeaturePlayerAllocationFilter), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + packingStrategy: apis.Distributed, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2Allocated, &gs3}, + }, + "distributed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + packingStrategy: apis.Distributed, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "distributed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + } + + for testName, testScenario := range fixtures { + t.Run(testName, func(t *testing.T) { + // deliberately not resetting the Feature state, to catch any possible unknown regressions with the + // new feature flags + if testScenario.features != "" { + require.NoError(t, runtime.ParseFeatures(testScenario.features)) + } + + cache, m := newFakeAllocationCache() + + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{ + Items: func(input []*agonesv1.GameServer) []agonesv1.GameServer { + result := make([]agonesv1.GameServer, len(input)) + for i, gs := range input { + result[i] = *gs + } + return result + }(testScenario.list), + }, nil + }) + + ctx, cancel := agtesting.StartInformers(m, cache.gameServerSynced) + defer cancel() + + // This call initializes the cache + err := cache.syncCache() + assert.Nil(t, err) + + err = cache.counter.Run(ctx, 0) + assert.Nil(t, err) + + strategy := apis.Packed + if testScenario.packingStrategy != "" { + strategy = apis.SchedulingStrategy(testScenario.packingStrategy) + } + + cache.ReorderGameServerAfterAllocation( + testScenario.list, + testScenario.gsToReorderIndex, testScenario.gsToReorder, + testScenario.priorities, strategy) + + if !assert.Equal(t, testScenario.want, testScenario.list, "reordered list should match expected") { + for _, gs := range testScenario.list { + t.Logf("%s, ", gs.Name) + } + } + }) + } +} diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index 8733efeedc..5d0b80a345 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -99,6 +99,8 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun var list []*agonesv1.GameServer var sortKey uint64 requestCount := 0 + gsToReorderIndex := -1 + var gsToReorder *agonesv1.GameServer metrics := c.newMetrics(ctx) batchResponsesPerGs := make(map[string]batchResponses) @@ -115,6 +117,8 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun list = nil requestCount = 0 + gsToReorderIndex = -1 + gsToReorder = nil } checkSortKey := func(gsa *allocationv1.GameServerAllocation) { @@ -149,12 +153,20 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun // Sort list if necessary if list == nil { + // There could be a bug to not flush the list if the scheduling changes between gsas. if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) || gsa.Spec.Scheduling == apis.Packed { list = c.allocationCache.ListSortedGameServers(gsa) } else { // If FeatureCountsAndLists and Scheduling == Distributed, sort game servers by Priorities list = c.allocationCache.ListSortedGameServersPriorities(gsa) } + } else if gsToReorderIndex >= 0 { + // We can use the priorities from the new gsa because if they were different, + // we would have flushed the list and started a new one. + c.allocationCache.ReorderGameServerAfterAllocation( + list, /*where*/ + gsToReorderIndex, gsToReorder, /*what*/ + gsa.Spec.Priorities, gsa.Spec.Scheduling /*how*/) } } @@ -163,38 +175,41 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun case req := <-c.pendingRequests: checkRefreshList(req.gsa) - gs, index, err := findGameServerForAllocation(req.gsa, list) + foundGs, foundGsIndex, err := findGameServerForAllocation(req.gsa, list) if err != nil { req.response <- response{request: req, gs: nil, err: err} continue } - // if the gs has not been already allocated in this batch, remove it from the cache, + // If the gs has not been already allocated in this batch, remove it from the cache, // but keep it in the list for the next allocation - existingBatch, alreadyAllocated := batchResponsesPerGs[string(gs.UID)] + existingBatch, alreadyAllocated := batchResponsesPerGs[string(foundGs.UID)] if !alreadyAllocated { - if removeErr := c.allocationCache.RemoveGameServer(gs); removeErr != nil { + if removeErr := c.allocationCache.RemoveGameServer(foundGs); removeErr != nil { // this seems unlikely, but lets handle it just in case removeErr = errors.Wrap(removeErr, "error removing gameserver from cache") req.response <- response{request: req, gs: nil, err: removeErr} // remove the game server because it is problematic - list = append(list[:index], list[index+1:]...) + list = append(list[:foundGsIndex], list[foundGsIndex+1:]...) continue } } - // apply the allocation to the gs in the list (not in cache anymore) - applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gs, req.gsa) + // Apply the allocation to a copy of the gs in the list (not in cache anymore). + // It will be added back to the list during reordering before the next gsa gets processed. + gsToReorder = foundGs.DeepCopy() + gsToReorderIndex = foundGsIndex + applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gsToReorder, req.gsa) if applyError == nil { if alreadyAllocated { - existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gs.DeepCopy(), err: nil}) + existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gsToReorder.DeepCopy(), err: nil}) existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors) existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors) - batchResponsesPerGs[string(gs.UID)] = existingBatch + batchResponsesPerGs[string(gsToReorder.UID)] = existingBatch } else { // first time we see this gs in this batch - batchResponsesPerGs[string(gs.UID)] = batchResponses{ - responses: []response{{request: req, gs: gs.DeepCopy(), err: nil}}, + batchResponsesPerGs[string(gsToReorder.UID)] = batchResponses{ + responses: []response{{request: req, gs: gsToReorder.DeepCopy(), err: nil}}, counterErrors: counterErrors, listErrors: listErrors, } From c86b2392db9cdc1ef04615b1c52aaa6c6c5a06d3 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Sat, 9 Aug 2025 00:14:05 +0300 Subject: [PATCH 7/8] Fix lint --- pkg/gameserverallocations/allocation_cache.go | 14 +++++++------- pkg/gameserverallocations/allocation_cache_test.go | 4 +--- pkg/util/runtime/features.go | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/gameserverallocations/allocation_cache.go b/pkg/gameserverallocations/allocation_cache.go index 8ce7f70c59..f8da57ade5 100644 --- a/pkg/gameserverallocations/allocation_cache.go +++ b/pkg/gameserverallocations/allocation_cache.go @@ -330,7 +330,7 @@ func (c *AllocationCache) ReorderGameServerAfterAllocation( newIndex += gsIndexBeforeAllocation } } else { - c.baseLogger.WithField("startegy", strategy). + c.baseLogger.WithField("strategy", strategy). Warn("Scheduling strategy not supported! Reordering is skipped!") } } @@ -350,7 +350,7 @@ func (c *AllocationCache) ReorderGameServerAfterAllocation( // compareGameServersAfterAllocationForDistributedStrategy compares the priority of the before and after applying an allocation to a game server. // The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. // It does not take into account the name of the game server, so it can return an equal result. -// Used for the distributed startegy. +// Used for the distributed strategy. func compareGameServersAfterAllocationForDistributedStrategy( before, after *agonesv1.GameServer, priorities []agonesv1.Priority) (bool, bool) { @@ -367,7 +367,7 @@ func compareGameServersAfterAllocationForDistributedStrategy( // compareGameServersAfterAllocationForPackedStrategy compares the priority of the before and after applying an allocation to a game server. // The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. // It does not take into account the name of the game server, so it can return an equal result. -// Used for the packed startegy. +// Used for the packed strategy. func compareGameServersAfterAllocationForPackedStrategy( before, after *agonesv1.GameServer, priorities []agonesv1.Priority, @@ -432,7 +432,7 @@ func compareGameServersAfterAllocationForPackedStrategy( // compareGameServersForPakcedStrategy compares the priority of two game servers based on the given priorities and node counts. // The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the // less comparison as higher priority gs are positioned to the beginning of the list). -// Used for the packed startegy. +// Used for the packed strategy. func compareGameServersForPakcedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) bool { greater, equal := compareGameServersAfterAllocationForPackedStrategy(gs0, gs1, priorities, counts) if !equal { @@ -446,7 +446,7 @@ func compareGameServersForPakcedStrategy(gs0, gs1 *agonesv1.GameServer, prioriti // compareGameServers compares the priority of two game servers based on the given priorities. // The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the // less comparison as higher priority gs are positioned to the beginning of the list). -// Used for the distributed startegy. +// Used for the distributed strategy. func compareGameServersForDistributedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority) bool { greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gs0, gs1, priorities) if !equal { @@ -458,7 +458,7 @@ func compareGameServersForDistributedStrategy(gs0, gs1 *agonesv1.GameServer, pri } // findIndexAfterAllocationForPackedStrategy finds the index where the gs should be inserted to maintain the list sorted. -// Used for the packed startegy. +// Used for the packed strategy. func findIndexAfterAllocationForPackedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) int { pos := sort.Search(len(gsList), func(i int) bool { return compareGameServersForPakcedStrategy(gs, gsList[i], priorities, counts) @@ -467,7 +467,7 @@ func findIndexAfterAllocationForPackedStrategy(gsList []*agonesv1.GameServer, gs } // findIndexAfterAllocationForDistributedStrategy finds the index where the gs should be inserted to maintain the list sorted. -// Used for the distributed startegy. +// Used for the distributed strategy. func findIndexAfterAllocationForDistributedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority) int { pos := sort.Search(len(gsList), func(i int) bool { return compareGameServersForDistributedStrategy(gs, gsList[i], priorities) diff --git a/pkg/gameserverallocations/allocation_cache_test.go b/pkg/gameserverallocations/allocation_cache_test.go index 35890d019c..3c847afbca 100644 --- a/pkg/gameserverallocations/allocation_cache_test.go +++ b/pkg/gameserverallocations/allocation_cache_test.go @@ -614,8 +614,6 @@ func TestAllocationCacheReorderGameServerAfterAllocation(t *testing.T) { runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() - //gs0 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, - // Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} gs0Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, @@ -871,7 +869,7 @@ func TestAllocationCacheReorderGameServerAfterAllocation(t *testing.T) { strategy := apis.Packed if testScenario.packingStrategy != "" { - strategy = apis.SchedulingStrategy(testScenario.packingStrategy) + strategy = testScenario.packingStrategy } cache.ReorderGameServerAfterAllocation( diff --git a/pkg/util/runtime/features.go b/pkg/util/runtime/features.go index 56e969b560..b1094310c3 100644 --- a/pkg/util/runtime/features.go +++ b/pkg/util/runtime/features.go @@ -169,7 +169,7 @@ var ( FeatureAllocatorBatchesUpdates: false, // Dev features - FeatureProcessorAllocator: false, + FeatureProcessorAllocator: false, // Example feature FeatureExample: false, From 0273f2ebde805e2364fc6d2b9af7805090581a68 Mon Sep 17 00:00:00 2001 From: Mihai Pancu Date: Thu, 11 Sep 2025 12:45:07 +0300 Subject: [PATCH 8/8] Cherry picked "Return GameServerAllocationUnAllocated when an game server update error occurs" and made it work with the batch allocator --- pkg/gameserverallocations/allocator.go | 10 ++++++---- pkg/gameserverallocations/allocator_test.go | 9 ++++++--- pkg/gameserverallocations/batch_allocator.go | 5 +++-- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index bcdc21f6c1..c5400ed91e 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -67,6 +67,8 @@ var ( ErrConflictInGameServerSelection = errors.New("The Gameserver was already allocated") // ErrTotalTimeoutExceeded is used to signal that total retry timeout has been exceeded and no additional retries should be made ErrTotalTimeoutExceeded = status.Errorf(codes.DeadlineExceeded, "remote allocation total timeout exceeded") + // ErrGameServerUpdateConflict is returned when the game server selected for applying the allocation cannot be updated + ErrGameServerUpdateConflict = errors.New("Could not update the selected GameServer") ) const ( @@ -277,10 +279,10 @@ func (c *Allocator) allocateFromLocalCluster(ctx context.Context, gsa *allocatio return nil, err } - switch err { - case ErrNoGameServer: + switch { + case goErrors.Is(err, ErrNoGameServer), goErrors.Is(err, ErrGameServerUpdateConflict): gsa.Status.State = allocationv1.GameServerAllocationUnAllocated - case ErrConflictInGameServerSelection: + case goErrors.Is(err, ErrConflictInGameServerSelection): gsa.Status.State = allocationv1.GameServerAllocationContention default: gsa.ObjectMeta.Name = gs.ObjectMeta.Name @@ -612,7 +614,7 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int // we should wait for it to get updated with fresh info. c.allocationCache.AddGameServer(gs) } - res.err = errors.Wrap(err, "error updating allocated gameserver") + res.err = goErrors.Join(ErrGameServerUpdateConflict, err) } else { // put the GameServer back into the cache, so it's immediately around for re-allocation c.allocationCache.AddGameServer(gs) diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index e0355cb5cb..c3022fe7fa 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -543,7 +543,8 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { _, err := a.allocate(ctx, gsa.DeepCopy()) log.WithError(err).Info("allocate (private): failed allocation") require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, "error updating allocated gameserver: failed to update") + require.True(t, errors.Is(err, ErrGameServerUpdateConflict)) + require.EqualError(t, err, "Could not update the selected GameServer\nfailed to update") // make sure we aren't in the same batch! time.Sleep(2 * a.batchWaitTime) @@ -558,7 +559,8 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { log.WithField("result", result).WithError(err).Info("Allocate (public): failed allocation") require.Nil(t, result) require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, "error updating allocated gameserver: failed to update") + require.True(t, errors.Is(err, ErrGameServerUpdateConflict)) + require.EqualError(t, err, "Could not update the selected GameServer\nfailed to update") } func TestAllocatorRunLocalAllocations(t *testing.T) { @@ -1071,7 +1073,8 @@ func TestControllerAllocationUpdateWorkers(t *testing.T) { r = <-r.request.response assert.True(t, updated) - assert.EqualError(t, r.err, "error updating allocated gameserver: something went wrong") + assert.True(t, errors.Is(r.err, ErrGameServerUpdateConflict)) + assert.EqualError(t, r.err, "Could not update the selected GameServer\nsomething went wrong") assert.Equal(t, gs1, r.gs) agtesting.AssertNoEvent(t, m.FakeRecorder.Events) diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index 5d0b80a345..72087c0fa4 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -43,6 +43,7 @@ func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCoun requestStartTime := time.Now() // Try to update with the latest gs state + var propagatedErr error updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{}) if updateErr != nil { metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) @@ -53,7 +54,7 @@ func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCoun // we should wait for it to get updated with fresh info. c.allocationCache.AddGameServer(updatedGs) } - updateErr = errors.Wrap(updateErr, "error updating allocated gameserver") + propagatedErr = goErrors.Join(ErrGameServerUpdateConflict, updateErr) } else { metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) @@ -72,7 +73,7 @@ func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCoun // Forward all responses with their appropriate gs state and update error for _, res := range batchRes.responses { - res.err = updateErr + res.err = propagatedErr res.request.response <- res } }