diff --git a/build/Makefile b/build/Makefile index 85ff6e13f4..9e0d9f93e6 100644 --- a/build/Makefile +++ b/build/Makefile @@ -73,7 +73,7 @@ BETA_FEATURE_GATES ?= "AutopilotPassthroughPort=true&CountsAndLists=true&GKEAuto # Enable all alpha feature gates. Keep in sync with `false` (alpha) entries in pkg/util/runtime/features.go:featureDefaults -ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&SidecarContainers=true&Example=true" +ALPHA_FEATURE_GATES ?= "AllocatorBatchesUpdates=true&PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&SidecarContainers=true&Example=true" # Build with Windows support WITH_WINDOWS=1 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index a07a5529d8..0dc8992968 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -308,7 +308,7 @@ steps: # Keep in sync with the inverse of 'alpha' and 'beta' features in # pkg/util/runtime/features.go:featureDefaults - featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&AutopilotPassthroughPort=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=true&Example=true" + featureWithGate="AllocatorBatchesUpdates=true&PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=true&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&AutopilotPassthroughPort=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=true&Example=true" featureWithoutGate="" # Use this if specific feature gates can only be supported on specific Kubernetes versions. diff --git a/install/helm/agones/defaultfeaturegates.yaml b/install/helm/agones/defaultfeaturegates.yaml index 1e957de77c..0f4ddb518b 100644 --- a/install/helm/agones/defaultfeaturegates.yaml +++ b/install/helm/agones/defaultfeaturegates.yaml @@ -34,10 +34,12 @@ FleetAutoscaleRequestMetaData: false PlayerAllocationFilter: false PlayerTracking: false SidecarContainers: false +AllocatorBatchesUpdates: false # Dev features ProcessorAllocator: false WasmAutoscaler: false +AllocatorBatchesUpdates: false # Example feature Example: false diff --git a/pkg/gameserverallocations/allocation_cache.go b/pkg/gameserverallocations/allocation_cache.go index 75c53afd80..f8da57ade5 100644 --- a/pkg/gameserverallocations/allocation_cache.go +++ b/pkg/gameserverallocations/allocation_cache.go @@ -18,6 +18,7 @@ import ( "context" "sort" + "agones.dev/agones/pkg/apis" "agones.dev/agones/pkg/apis/agones" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" @@ -174,64 +175,17 @@ func (c *AllocationCache) ListSortedGameServers(gsa *allocationv1.GameServerAllo counts := c.counter.Counts() sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] + gs0 := list[i] + gs1 := list[j] - // Search Allocated GameServers first. - if gs1.Status.State != gs2.Status.State { - return gs1.Status.State == agonesv1.GameServerStateAllocated - } - - c1, ok := counts[gs1.Status.NodeName] - if !ok { - return false - } - - c2, ok := counts[gs2.Status.NodeName] - if !ok { - return true - } - - if c1.Allocated > c2.Allocated { - return true - } - if c1.Allocated < c2.Allocated { - return false - } - - // prefer nodes that have the most Ready gameservers on them - they are most likely to be - // completely filled and least likely target for scale down. - if c1.Ready < c2.Ready { - return false - } - if c1.Ready > c2.Ready { - return true - } - - // if player tracking is enabled, prefer game servers with the least amount of room left - if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { - if gs1.Status.Players != nil && gs2.Status.Players != nil { - cap1 := gs1.Status.Players.Capacity - gs1.Status.Players.Count - cap2 := gs2.Status.Players.Capacity - gs2.Status.Players.Count - - // if they are equal, pass the comparison through. - if cap1 < cap2 { - return true - } else if cap2 < cap1 { - return false - } - } - } - - // if we end up here, then break the tie with Counter or List Priority. + var priorities []agonesv1.Priority if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } + priorities = gsa.Spec.Priorities + } else { + priorities = nil } - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForPakcedStrategy(gs0, gs1, priorities, counts) }) return list @@ -246,17 +200,17 @@ func (c *AllocationCache) ListSortedGameServersPriorities(gsa *allocationv1.Game } sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] + gs0 := list[i] + gs1 := list[j] + var priorities []agonesv1.Priority if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } + priorities = gsa.Spec.Priorities + } else { + priorities = nil } - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForDistributedStrategy(gs0, gs1, priorities) }) return list @@ -327,3 +281,196 @@ func (c *AllocationCache) getKey(gs *agonesv1.GameServer) (string, bool) { } return key, ok } + +// ReorderGameServerAfterAllocation positions the new gsAfterAllocation in the gsList according to the given priorities +// and using the gsIndexBeforeAllocation as hint to optimize the reordering. This is used by the batch allocator +// to reorder the gs after locally (not in cache) applying an allocation. +func (c *AllocationCache) ReorderGameServerAfterAllocation( + gsList []*agonesv1.GameServer, + gsIndexBeforeAllocation int, gsAfterAllocation *agonesv1.GameServer, + priorities []agonesv1.Priority, strategy apis.SchedulingStrategy) { + if len(gsList) == 0 || gsIndexBeforeAllocation < 0 || gsIndexBeforeAllocation >= len(gsList) || gsAfterAllocation == nil { + c.baseLogger.WithField("gsIndexBeforeAllocation", gsIndexBeforeAllocation). + WithField("gsAfterAllocation", gsAfterAllocation). + WithField("gsListLength", len(gsList)). + Warn("ReorderGameServerAfterAllocation called with invalid parameters! Reordering is skipped!") + return + } + + newIndex := gsIndexBeforeAllocation + gsToReorderOriginal := gsList[gsIndexBeforeAllocation] + + optimizeList := func(greater bool) []*agonesv1.GameServer { + var optimizedGsList []*agonesv1.GameServer + if greater { + // If the gs has less priority than the original, we need to insert it at the end of the list + optimizedGsList = gsList[gsIndexBeforeAllocation+1:] + } else { + // Otherwise, we need to insert it at the beginning of the list + optimizedGsList = gsList[:gsIndexBeforeAllocation] + } + return optimizedGsList + } + + switch strategy { + case apis.Packed: + counts := c.counter.Counts() + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities, counts) + if !equal { + newIndex = findIndexAfterAllocationForPackedStrategy(optimizeList(greater), gsAfterAllocation, priorities, counts) + if greater { + newIndex += gsIndexBeforeAllocation + } + } + case apis.Distributed: + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities) + if !equal { + newIndex = findIndexAfterAllocationForDistributedStrategy(optimizeList(greater), gsAfterAllocation, priorities) + if greater { + newIndex += gsIndexBeforeAllocation + } + } else { + c.baseLogger.WithField("strategy", strategy). + Warn("Scheduling strategy not supported! Reordering is skipped!") + } + } + + if newIndex != gsIndexBeforeAllocation { + // If the new index is different than the original index, we need to: + // remove the original + gsList = append(gsList[:gsIndexBeforeAllocation], gsList[gsIndexBeforeAllocation+1:]...) + // and insert the updated one + gsList = append(gsList[:newIndex], append([]*agonesv1.GameServer{gsAfterAllocation}, gsList[newIndex:]...)...) + } else { + // No reordering needed, just update the gs in the list + gsList[gsIndexBeforeAllocation] = gsAfterAllocation + } +} + +// compareGameServersAfterAllocationForDistributedStrategy compares the priority of the before and after applying an allocation to a game server. +// The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. +// It does not take into account the name of the game server, so it can return an equal result. +// Used for the distributed strategy. +func compareGameServersAfterAllocationForDistributedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority) (bool, bool) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + + // gs priority remains the same after allocation + return false, true +} + +// compareGameServersAfterAllocationForPackedStrategy compares the priority of the before and after applying an allocation to a game server. +// The first bool returned has the meaning of greater (before has greater priority than after) and the second of equal. If equal is true, discard the value of less. +// It does not take into account the name of the game server, so it can return an equal result. +// Used for the packed strategy. +func compareGameServersAfterAllocationForPackedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority, + counts map[string]gameservers.NodeCount) (bool, bool) { + // Search Allocated GameServers first. + if before.Status.State != after.Status.State { + return before.Status.State == agonesv1.GameServerStateAllocated, false + } + + c1, ok := counts[before.Status.NodeName] + if !ok { + return false, false + } + + c2, ok := counts[after.Status.NodeName] + if !ok { + return true, false + } + + if c1.Allocated > c2.Allocated { + return true, false + } + if c1.Allocated < c2.Allocated { + return false, false + } + + // prefer nodes that have the most Ready gameservers on them - they are most likely to be + // completely filled and least likely target for scale down. + if c1.Ready < c2.Ready { + return false, false + } + if c1.Ready > c2.Ready { + return true, false + } + + // if player tracking is enabled, prefer game servers with the least amount of room left + if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { + if before.Status.Players != nil && after.Status.Players != nil { + cap1 := before.Status.Players.Capacity - before.Status.Players.Count + cap2 := after.Status.Players.Capacity - after.Status.Players.Count + + // if they are equal, pass the comparison through. + if cap1 < cap2 { + return true, false + } else if cap2 < cap1 { + return false, false + } + } + } + + // if we end up here, then break the tie with Counter or List Priority. + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + + // gs priority remains the same after allocation + return false, true +} + +// compareGameServersForPakcedStrategy compares the priority of two game servers based on the given priorities and node counts. +// The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the +// less comparison as higher priority gs are positioned to the beginning of the list). +// Used for the packed strategy. +func compareGameServersForPakcedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) bool { + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gs0, gs1, priorities, counts) + if !equal { + return greater + } + + // finally sort lexicographically, so we have a stable order + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// compareGameServers compares the priority of two game servers based on the given priorities. +// The bool returned has the meaning of greater (gs0 has greater priority than gs1 which is equivalent to the +// less comparison as higher priority gs are positioned to the beginning of the list). +// Used for the distributed strategy. +func compareGameServersForDistributedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority) bool { + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gs0, gs1, priorities) + if !equal { + return greater + } + + // finally sort lexicographically, so we have a stable order + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// findIndexAfterAllocationForPackedStrategy finds the index where the gs should be inserted to maintain the list sorted. +// Used for the packed strategy. +func findIndexAfterAllocationForPackedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) int { + pos := sort.Search(len(gsList), func(i int) bool { + return compareGameServersForPakcedStrategy(gs, gsList[i], priorities, counts) + }) + return pos +} + +// findIndexAfterAllocationForDistributedStrategy finds the index where the gs should be inserted to maintain the list sorted. +// Used for the distributed strategy. +func findIndexAfterAllocationForDistributedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority) int { + pos := sort.Search(len(gsList), func(i int) bool { + return compareGameServersForDistributedStrategy(gs, gsList[i], priorities) + }) + return pos +} diff --git a/pkg/gameserverallocations/allocation_cache_test.go b/pkg/gameserverallocations/allocation_cache_test.go index 5904f12e2b..3c847afbca 100644 --- a/pkg/gameserverallocations/allocation_cache_test.go +++ b/pkg/gameserverallocations/allocation_cache_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "agones.dev/agones/pkg/apis" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/gameservers" @@ -607,3 +608,280 @@ func newFakeAllocationCache() (*AllocationCache, agtesting.Mocks) { cache := NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory), healthcheck.NewHandler()) return cache, m } + +func TestAllocationCacheReorderGameServerAfterAllocation(t *testing.T) { + t.Parallel() + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + + gs0Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs1Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs2 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs2Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs3 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, UID: "3"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateReady}} + gs4 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, UID: "4"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 3, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 3, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2"}, + Capacity: 10, + }, + }, + }, + } + gs5 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 2, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 2, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1"}, + Capacity: 10, + }, + }, + }, + } + gs5Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 5, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 5, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2", "player3", "player4"}, + Capacity: 10, + }, + }, + }, + } + gs6 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, UID: "6"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 1, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 1, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0"}, + Capacity: 10, + }, + }, + }, + } + + fixtures := map[string]struct { + features string + list []*agonesv1.GameServer + priorities []agonesv1.Priority + packingStrategy apis.SchedulingStrategy + gsToReorder *agonesv1.GameServer + gsToReorderIndex int + want []*agonesv1.GameServer + }{ + "pakced (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2, &gs3}, + }, + "packed": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + }, + "packed (sort by name)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2Allocated, &gs3}, + }, + "packed (all ready)": { + list: []*agonesv1.GameServer{&gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated, &gs2, &gs3}, + }, + "packed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "packed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "packed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + "packed (FeaturePlayerAllocationFilter)": { + features: fmt.Sprintf("%s=true", runtime.FeaturePlayerAllocationFilter), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + packingStrategy: apis.Distributed, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2Allocated, &gs3}, + }, + "distributed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + packingStrategy: apis.Distributed, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "distributed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + } + + for testName, testScenario := range fixtures { + t.Run(testName, func(t *testing.T) { + // deliberately not resetting the Feature state, to catch any possible unknown regressions with the + // new feature flags + if testScenario.features != "" { + require.NoError(t, runtime.ParseFeatures(testScenario.features)) + } + + cache, m := newFakeAllocationCache() + + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{ + Items: func(input []*agonesv1.GameServer) []agonesv1.GameServer { + result := make([]agonesv1.GameServer, len(input)) + for i, gs := range input { + result[i] = *gs + } + return result + }(testScenario.list), + }, nil + }) + + ctx, cancel := agtesting.StartInformers(m, cache.gameServerSynced) + defer cancel() + + // This call initializes the cache + err := cache.syncCache() + assert.Nil(t, err) + + err = cache.counter.Run(ctx, 0) + assert.Nil(t, err) + + strategy := apis.Packed + if testScenario.packingStrategy != "" { + strategy = testScenario.packingStrategy + } + + cache.ReorderGameServerAfterAllocation( + testScenario.list, + testScenario.gsToReorderIndex, testScenario.gsToReorder, + testScenario.priorities, strategy) + + if !assert.Equal(t, testScenario.want, testScenario.list, "reordered list should match expected") { + for _, gs := range testScenario.list { + t.Logf("%s, ", gs.Name) + } + } + }) + } +} diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index c63923f34d..c5400ed91e 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -67,6 +67,8 @@ var ( ErrConflictInGameServerSelection = errors.New("The Gameserver was already allocated") // ErrTotalTimeoutExceeded is used to signal that total retry timeout has been exceeded and no additional retries should be made ErrTotalTimeoutExceeded = status.Errorf(codes.DeadlineExceeded, "remote allocation total timeout exceeded") + // ErrGameServerUpdateConflict is returned when the game server selected for applying the allocation cannot be updated + ErrGameServerUpdateConflict = errors.New("Could not update the selected GameServer") ) const ( @@ -174,7 +176,11 @@ func (c *Allocator) Run(ctx context.Context) error { } // workers and logic for batching allocations - go c.ListenAndAllocate(ctx, maxBatchQueue) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go c.ListenAndBatchAllocate(ctx, maxBatchQueue) + } else { + go c.ListenAndAllocate(ctx, maxBatchQueue) + } return nil } @@ -273,10 +279,10 @@ func (c *Allocator) allocateFromLocalCluster(ctx context.Context, gsa *allocatio return nil, err } - switch err { - case ErrNoGameServer: + switch { + case goErrors.Is(err, ErrNoGameServer), goErrors.Is(err, ErrGameServerUpdateConflict): gsa.Status.State = allocationv1.GameServerAllocationUnAllocated - case ErrConflictInGameServerSelection: + case goErrors.Is(err, ErrConflictInGameServerSelection): gsa.Status.State = allocationv1.GameServerAllocationContention default: gsa.ObjectMeta.Name = gs.ObjectMeta.Name @@ -511,14 +517,23 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int var list []*agonesv1.GameServer var sortKey uint64 requestCount := 0 + metrics := c.newMetrics(ctx) + + flush := func() { + if requestCount > 0 { + metrics.recordAllocationsBatchSize(ctx, requestCount) + } + + list = nil + requestCount = 0 + } for { select { case req := <-c.pendingRequests: // refresh the list after every 100 allocations made in a single batch if requestCount >= maxBatchBeforeRefresh { - list = nil - requestCount = 0 + flush() } if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { @@ -536,8 +551,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int if newSortKey != sortKey { sortKey = newSortKey - list = nil - requestCount = 0 + flush() } } @@ -571,8 +585,8 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int case <-ctx.Done(): return default: - list = nil - requestCount = 0 + flush() + // slow down cpu churn, and allow items to batch time.Sleep(c.batchWaitTime) } @@ -600,7 +614,7 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int // we should wait for it to get updated with fresh info. c.allocationCache.AddGameServer(gs) } - res.err = errors.Wrap(err, "error updating allocated gameserver") + res.err = goErrors.Join(ErrGameServerUpdateConflict, err) } else { // put the GameServer back into the cache, so it's immediately around for re-allocation c.allocationCache.AddGameServer(gs) @@ -663,10 +677,15 @@ func (c *Allocator) applyAllocationToGameServer(ctx context.Context, mp allocati } } + metrics := c.newMetrics(ctx) + requestStartTime := time.Now() + gsUpdate, updateErr := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gs, metav1.UpdateOptions{}) if updateErr != nil { + metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) return gsUpdate, updateErr } + metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) // If successful Update record any Counter or List action errors as a warning if counterErrors != nil { @@ -693,11 +712,10 @@ func Retry(backoff wait.Backoff, fn func() error) error { } } + // No quick 400s, still do retries for ErrNoGameServer switch { case err == nil: return true, nil - case err == ErrNoGameServer: - return true, err case err == ErrTotalTimeoutExceeded: return true, err default: diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index 99347b94c3..c3022fe7fa 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "time" @@ -43,11 +44,34 @@ import ( ) func TestAllocatorAllocate(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorAllocateImpl(t, testScenario.features) + }) + } + +} + +func testAllocatorAllocateImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } f, gsList := defaultFixtures(4) a, m := newFakeAllocator() @@ -130,11 +154,34 @@ func TestAllocatorAllocate(t *testing.T) { } func TestAllocatorAllocatePriority(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorAllocatePriorityImpl(t, testScenario.features) + }) + } + +} + +func testAllocatorAllocatePriorityImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } run := func(t *testing.T, name string, test func(t *testing.T, a *Allocator, gas *allocationv1.GameServerAllocation)) { f, gsList := defaultFixtures(4) @@ -496,7 +543,8 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { _, err := a.allocate(ctx, gsa.DeepCopy()) log.WithError(err).Info("allocate (private): failed allocation") require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, "error updating allocated gameserver: failed to update") + require.True(t, errors.Is(err, ErrGameServerUpdateConflict)) + require.EqualError(t, err, "Could not update the selected GameServer\nfailed to update") // make sure we aren't in the same batch! time.Sleep(2 * a.batchWaitTime) @@ -511,15 +559,38 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { log.WithField("result", result).WithError(err).Info("Allocate (public): failed allocation") require.Nil(t, result) require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, "error updating allocated gameserver: failed to update") + require.True(t, errors.Is(err, ErrGameServerUpdateConflict)) + require.EqualError(t, err, "Could not update the selected GameServer\nfailed to update") } func TestAllocatorRunLocalAllocations(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true", runtime.FeatureAllocatorBatchesUpdates), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false", runtime.FeatureAllocatorBatchesUpdates), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorRunLocalAllocationsImpl(t, testScenario.features) + }) + } +} + +func testAllocatorRunLocalAllocationsImpl(t *testing.T, features string) { t.Parallel() // TODO: remove when `CountsAndLists` feature flag is moved to stable. runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() + if features != "" { + require.NoError(t, runtime.ParseFeatures(features)) + } t.Run("no problems", func(t *testing.T) { f, gsList := defaultFixtures(5) @@ -568,7 +639,11 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { j3 := request{gsa: gsa.DeepCopy(), response: make(chan response)} a.pendingRequests <- j3 - go a.ListenAndAllocate(ctx, 3) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 3) + } else { + go a.ListenAndAllocate(ctx, 3) + } res1 := <-j1.response assert.NoError(t, res1.err) @@ -591,7 +666,12 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { assert.NotEqual(t, res1.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) assert.NotEqual(t, res2.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) - assert.Equal(t, 3, updateCount) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + // updates are batched into one + assert.Equal(t, 1, updateCount) + } else { + assert.Equal(t, 3, updateCount) + } }) t.Run("no gameservers", func(t *testing.T) { @@ -620,7 +700,11 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { j1 := request{gsa: gsa.DeepCopy(), response: make(chan response)} a.pendingRequests <- j1 - go a.ListenAndAllocate(ctx, 3) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 3) + } else { + go a.ListenAndAllocate(ctx, 3) + } res1 := <-j1.response assert.Nil(t, res1.gs) @@ -630,11 +714,30 @@ func TestAllocatorRunLocalAllocations(t *testing.T) { } func TestAllocatorRunLocalAllocationsCountsAndLists(t *testing.T) { + testScenarios := map[string]struct { + features string + }{ + "Allocator batches updates": { + features: fmt.Sprintf("%s=true&%s=true", runtime.FeatureAllocatorBatchesUpdates, runtime.FeatureCountsAndLists), + }, + "Allocator does not batches updates": { + features: fmt.Sprintf("%s=false&%s=true", runtime.FeatureAllocatorBatchesUpdates, runtime.FeatureCountsAndLists), + }, + } + + for test, testScenario := range testScenarios { + t.Run(test, func(t *testing.T) { + testAllocatorRunLocalAllocationsCountsAndLists(t, testScenario.features) + }) + } +} + +func testAllocatorRunLocalAllocationsCountsAndLists(t *testing.T, features string) { t.Parallel() runtime.FeatureTestMutex.Lock() defer runtime.FeatureTestMutex.Unlock() - assert.NoError(t, runtime.ParseFeatures(string(runtime.FeatureCountsAndLists)+"=true")) + assert.NoError(t, runtime.ParseFeatures(features)) a, m := newFakeAllocator() @@ -797,7 +900,11 @@ func TestAllocatorRunLocalAllocationsCountsAndLists(t *testing.T) { j6 := request{gsa: gsaListDistributed.DeepCopy(), response: make(chan response)} a.pendingRequests <- j6 - go a.ListenAndAllocate(ctx, 5) + if runtime.FeatureEnabled(runtime.FeatureAllocatorBatchesUpdates) { + go a.ListenAndBatchAllocate(ctx, 5) + } else { + go a.ListenAndAllocate(ctx, 5) + } res1 := <-j1.response assert.NoError(t, res1.err) @@ -966,7 +1073,8 @@ func TestControllerAllocationUpdateWorkers(t *testing.T) { r = <-r.request.response assert.True(t, updated) - assert.EqualError(t, r.err, "error updating allocated gameserver: something went wrong") + assert.True(t, errors.Is(r.err, ErrGameServerUpdateConflict)) + assert.EqualError(t, r.err, "Could not update the selected GameServer\nsomething went wrong") assert.Equal(t, gs1, r.gs) agtesting.AssertNoEvent(t, m.FakeRecorder.Events) @@ -1100,3 +1208,112 @@ func newFakeAllocator() (*Allocator, agtesting.Mocks) { return a, m } + +// newFakeAllocatorWithCustomBatchWaitTime returns a fake allocator with a batchWaitTime +func newFakeAllocatorWithCustomBatchWaitTime(batchWaitTime time.Duration) (*Allocator, agtesting.Mocks) { + m := agtesting.NewMocks() + + counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory) + a := NewAllocator( + m.AgonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(), + m.KubeInformerFactory.Core().V1().Secrets(), + m.AgonesClient.AgonesV1(), + m.KubeClient, + NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), counter, healthcheck.NewHandler()), + time.Second, + 5*time.Second, + batchWaitTime) + a.recorder = m.FakeRecorder + + return a, m +} + +// 1 gs and 2 gsa in the same batch: one gets the gs, +// the other needs to retry to get the gs +func TestAllocatorAllocateNoQuickNoGameServerError(t *testing.T) { + t.Parallel() + + // TODO: remove when `CountsAndLists` feature flag is moved to stable. + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + require.NoError(t, runtime.ParseFeatures( + fmt.Sprintf("%s=false&%s=true", runtime.FeaturePlayerAllocationFilter, runtime.FeatureCountsAndLists))) + + a, m := newFakeAllocatorWithCustomBatchWaitTime(2 * time.Second) + + // 1 gs, 2 gsa + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateAllocated, + Counters: map[string]agonesv1.CounterStatus{ + "capacity": { // Available Capacity == 999 + Count: 1, + Capacity: 1000, + }}}} + gsList := []agonesv1.GameServer{gs1} + gsLen := len(gsList) + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + ua := action.(k8stesting.UpdateAction) + gs := ua.GetObject().(*agonesv1.GameServer) + + return true, gs, nil + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + require.NoError(t, a.Run(ctx)) + // wait for all the gameservers to be in the cache + require.Eventuallyf(t, func() bool { + return a.allocationCache.cache.Len() == gsLen + }, 10*time.Second, time.Second, fmt.Sprintf("should be %d items in the cache", gsLen)) + + ALLOCATED := agonesv1.GameServerStateAllocated + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Scheduling: apis.Packed, + Selectors: []allocationv1.GameServerSelector{{ + GameServerState: &ALLOCATED, + Counters: map[string]allocationv1.CounterSelector{ + "capacity": { + MinAvailable: 1, + }}}, + }}, + } + + var waitTest sync.WaitGroup + + waitTest.Add(1) + go func() { + defer waitTest.Done() + + // First allocation succeeds + result1, err1 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err1) + require.NotNil(t, result1) + outGsa := result1.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Add(1) + go func() { + defer waitTest.Done() + + // Second allocation in the same batch + result2, err2 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err2) + require.NotNil(t, result2) + outGsa := result2.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Wait() +} diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go new file mode 100644 index 0000000000..72087c0fa4 --- /dev/null +++ b/pkg/gameserverallocations/batch_allocator.go @@ -0,0 +1,286 @@ +package gameserverallocations + +import ( + "context" + goErrors "errors" + "time" + + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// batchResponse is an async list of responses for matching requests +type batchResponses struct { + counterErrors error + listErrors error + responses []response +} + +// batchAllocationUpdateWorkers tries to update each newly allocated gs with the last state. If +// the update fails because of a version conflict, all allocations that were applied onto a gs +// will receive an error, thus being available for retries. If the update succeeds, all allocations +// that were applied onto a gs will succeed, and the gs with the updated state will be added +// back to the cache. +func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCount int) chan<- batchResponses { + metrics := c.newMetrics(ctx) + batchUpdateQueue := make(chan batchResponses) + + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case batchRes := <-batchUpdateQueue: + if len(batchRes.responses) > 0 { + // The last response contains the latest gs state + lastGsState := batchRes.responses[len(batchRes.responses)-1].gs + + requestStartTime := time.Now() + + // Try to update with the latest gs state + var propagatedErr error + updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{}) + if updateErr != nil { + metrics.recordAllocationUpdateFailure(ctx, time.Since(requestStartTime)) + + if !k8serrors.IsConflict(errors.Cause(updateErr)) { + // since we could not allocate, we should put it back + // but not if it's a conflict, as the cache is no longer up to date, and + // we should wait for it to get updated with fresh info. + c.allocationCache.AddGameServer(updatedGs) + } + propagatedErr = goErrors.Join(ErrGameServerUpdateConflict, updateErr) + } else { + metrics.recordAllocationUpdateSuccess(ctx, time.Since(requestStartTime)) + + // Add the server back as soon as possible and not wait for the informer to update the cache + c.allocationCache.AddGameServer(updatedGs) + + // If successful Update record any Counter or List action errors as a warning + if batchRes.counterErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "CounterActionError", batchRes.counterErrors.Error()) + } + if batchRes.listErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "ListActionError", batchRes.listErrors.Error()) + } + c.recorder.Event(updatedGs, corev1.EventTypeNormal, string(updatedGs.Status.State), "Allocated") + } + + // Forward all responses with their appropriate gs state and update error + for _, res := range batchRes.responses { + res.err = propagatedErr + res.request.response <- res + } + } + case <-ctx.Done(): + return + } + } + }() + } + + return batchUpdateQueue +} + +// ListenAndBatchAllocate is a blocking function that runs in a loop +// looking at c.pendingRequests for batches of requests that are coming through. +// The difference between this and the original ListenAndAllocate is that this will +// apply the allocation to the local gs (still removing it from the cache) and continue with +// the next allocation from the batch. When the batch is done, the update workers will try to +// update each newly allocated gs with the last state. +func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCount int) { + // setup workers for batch allocation updates + batchUpdateQueue := c.batchAllocationUpdateWorkers(ctx, updateWorkerCount) + + var list []*agonesv1.GameServer + var sortKey uint64 + requestCount := 0 + gsToReorderIndex := -1 + var gsToReorder *agonesv1.GameServer + + metrics := c.newMetrics(ctx) + batchResponsesPerGs := make(map[string]batchResponses) + + flush := func() { + if requestCount > 0 { + metrics.recordAllocationsBatchSize(ctx, requestCount) + } + + for _, batchResponses := range batchResponsesPerGs { + batchUpdateQueue <- batchResponses + } + batchResponsesPerGs = make(map[string]batchResponses) + + list = nil + requestCount = 0 + gsToReorderIndex = -1 + gsToReorder = nil + } + + checkSortKey := func(gsa *allocationv1.GameServerAllocation) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + // SortKey returns the sorting values (list of Priorities) as a determinstic key. + // In case gsa.Spec.Priorities is nil this will still return a sortKey. + // In case of error this will return 0 for the sortKey. + newSortKey, err := gsa.SortKey() + if err != nil { + c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err) + } + // Set sortKey if this is the first request, or the previous request errored on creating a sortKey. + if sortKey == uint64(0) { + sortKey = newSortKey + } + + if newSortKey != sortKey { + sortKey = newSortKey + flush() + } + } + } + + checkRefreshList := func(gsa *allocationv1.GameServerAllocation) { + // refresh the list after every 100 allocations made in a single batch + if requestCount >= maxBatchBeforeRefresh { + flush() + } + requestCount++ + + checkSortKey(gsa) + + // Sort list if necessary + if list == nil { + // There could be a bug to not flush the list if the scheduling changes between gsas. + if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) || gsa.Spec.Scheduling == apis.Packed { + list = c.allocationCache.ListSortedGameServers(gsa) + } else { + // If FeatureCountsAndLists and Scheduling == Distributed, sort game servers by Priorities + list = c.allocationCache.ListSortedGameServersPriorities(gsa) + } + } else if gsToReorderIndex >= 0 { + // We can use the priorities from the new gsa because if they were different, + // we would have flushed the list and started a new one. + c.allocationCache.ReorderGameServerAfterAllocation( + list, /*where*/ + gsToReorderIndex, gsToReorder, /*what*/ + gsa.Spec.Priorities, gsa.Spec.Scheduling /*how*/) + } + } + + for { + select { + case req := <-c.pendingRequests: + checkRefreshList(req.gsa) + + foundGs, foundGsIndex, err := findGameServerForAllocation(req.gsa, list) + if err != nil { + req.response <- response{request: req, gs: nil, err: err} + continue + } + + // If the gs has not been already allocated in this batch, remove it from the cache, + // but keep it in the list for the next allocation + existingBatch, alreadyAllocated := batchResponsesPerGs[string(foundGs.UID)] + if !alreadyAllocated { + if removeErr := c.allocationCache.RemoveGameServer(foundGs); removeErr != nil { + // this seems unlikely, but lets handle it just in case + removeErr = errors.Wrap(removeErr, "error removing gameserver from cache") + req.response <- response{request: req, gs: nil, err: removeErr} + + // remove the game server because it is problematic + list = append(list[:foundGsIndex], list[foundGsIndex+1:]...) + continue + } + } + + // Apply the allocation to a copy of the gs in the list (not in cache anymore). + // It will be added back to the list during reordering before the next gsa gets processed. + gsToReorder = foundGs.DeepCopy() + gsToReorderIndex = foundGsIndex + applyError, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gsToReorder, req.gsa) + if applyError == nil { + if alreadyAllocated { + existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gsToReorder.DeepCopy(), err: nil}) + existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors) + existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors) + batchResponsesPerGs[string(gsToReorder.UID)] = existingBatch + } else { // first time we see this gs in this batch + batchResponsesPerGs[string(gsToReorder.UID)] = batchResponses{ + responses: []response{{request: req, gs: gsToReorder.DeepCopy(), err: nil}}, + counterErrors: counterErrors, + listErrors: listErrors, + } + } + } else { + req.response <- response{request: req, gs: nil, err: applyError} + } + case <-ctx.Done(): + flush() + return + default: + flush() + + // If nothing is found in c.pendingRequests, we move to + // default: which will wait for c.batchWaitTime, to allow for some requests to backup in c.pendingRequests, + // providing us with a batch of Allocation requests in that channel + + // Once we have 1 or more requests in c.pendingRequests (which is buffered to 100), we can start the batch process. + time.Sleep(c.batchWaitTime) + } + } +} + +// applyAllocationToLocalGameServer patches the inputted GameServer with the allocation metadata changes, and updates it to the Allocated State. +// Returns the encountered errors. +func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) { + // add last allocated, so it always gets updated, even if it is already Allocated + ts, err := time.Now().MarshalText() + if err != nil { + return err, nil, nil + } + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, 1) + } + gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts) + gs.Status.State = agonesv1.GameServerStateAllocated + + // patch ObjectMeta labels + if mp.Labels != nil { + if gs.ObjectMeta.Labels == nil { + gs.ObjectMeta.Labels = make(map[string]string, len(mp.Labels)) + } + for key, value := range mp.Labels { + gs.ObjectMeta.Labels[key] = value + } + } + + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations)) + } + // apply annotations patch + for key, value := range mp.Annotations { + gs.ObjectMeta.Annotations[key] = value + } + + // perfom any Counter or List actions + var counterErrors error + var listErrors error + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + if gsa.Spec.Counters != nil { + for counter, ca := range gsa.Spec.Counters { + counterErrors = goErrors.Join(counterErrors, ca.CounterActions(counter, gs)) + } + } + if gsa.Spec.Lists != nil { + for list, la := range gsa.Spec.Lists { + listErrors = goErrors.Join(listErrors, la.ListActions(list, gs)) + } + } + } + + return nil, counterErrors, listErrors +} diff --git a/pkg/gameserverallocations/metrics.go b/pkg/gameserverallocations/metrics.go index e5d98b9b8b..36eaeb149b 100644 --- a/pkg/gameserverallocations/metrics.go +++ b/pkg/gameserverallocations/metrics.go @@ -40,8 +40,10 @@ var ( keyStatus = mt.MustTagKey("status") keySchedulingStrategy = mt.MustTagKey("scheduling_strategy") - gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") - gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsLatency = stats.Float64("gameserver_allocations/latency", "The duration of gameserver allocations", "s") + gameServerAllocationsRetryTotal = stats.Int64("gameserver_allocations/errors", "The errors of gameserver allocations", "1") + gameServerAllocationsUpdatesLatency = stats.Float64("gameserver_allocations/update_latency", "The duration of gameserver updates", "s") + gameServerAllocationsBatchSize = stats.Int64("gameserver_allocations/batch", "The gameserver allocations batch size", "1") stateViews = []*view.View{ { @@ -58,6 +60,20 @@ var ( Aggregation: view.Distribution(1, 2, 3, 4, 5), TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, }, + { + Name: "gameserver_allocations_updates_duration_seconds", + Measure: gameServerAllocationsUpdatesLatency, + Description: "The distribution of gameserver allocation update requests latencies.", + Aggregation: view.Distribution(0, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2, 3), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, + { + Name: "gameserver_allocations_batch_size", + Measure: gameServerAllocationsBatchSize, + Description: "The count of gameserver allocations in a batch", + Aggregation: view.Distribution(1, 2, 3, 4, 5, 10, 20, 50, 100), + TagKeys: []tag.Key{keyFleetName, keyClusterName, keyMultiCluster, keyStatus, keySchedulingStrategy}, + }, } ) @@ -155,3 +171,20 @@ func (r *metrics) recordAllocationRetrySuccess(ctx context.Context, retryCount i mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, gameServerAllocationsRetryTotal.M(int64(retryCount))) } + +// record the current allocation batch size. +func (r *metrics) recordAllocationsBatchSize(ctx context.Context, count int) { + stats.Record(ctx, gameServerAllocationsBatchSize.M(int64(count))) +} + +// record the gs successful update latency. +func (r *metrics) recordAllocationUpdateSuccess(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Success")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} + +// record the gs failed update latency. +func (r *metrics) recordAllocationUpdateFailure(ctx context.Context, duration time.Duration) { + mt.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(keyStatus, "Failure")}, + gameServerAllocationsUpdatesLatency.M(duration.Seconds())) +} diff --git a/pkg/util/runtime/features.go b/pkg/util/runtime/features.go index 6af5d67296..8ab52c5f7a 100644 --- a/pkg/util/runtime/features.go +++ b/pkg/util/runtime/features.go @@ -76,6 +76,9 @@ const ( // FeatureSidecarContainers is a feature flag to enable/disable k8s sidecar containers for the sdkserver FeatureSidecarContainers = "SidecarContainers" + // FeatureAllocatorBatchesUpdates is a feature flag to enable/disable applying the allocations in batches. + FeatureAllocatorBatchesUpdates = "AllocatorBatchesUpdates" + //////////////// // Dev features @@ -166,6 +169,7 @@ var ( FeaturePlayerAllocationFilter: false, FeaturePlayerTracking: false, FeatureSidecarContainers: false, + FeatureAllocatorBatchesUpdates: false, // Dev features FeatureProcessorAllocator: false, diff --git a/site/content/en/docs/Guides/feature-stages.md b/site/content/en/docs/Guides/feature-stages.md index fa68949240..c5e206a94c 100644 --- a/site/content/en/docs/Guides/feature-stages.md +++ b/site/content/en/docs/Guides/feature-stages.md @@ -42,6 +42,7 @@ The current set of `alpha` and `beta` feature gates: | [Scheduled Fleet Autoscaling](https://github.com/googleforgames/agones/issues/3008) | `ScheduledAutoscaler` | Enabled | `Beta` | 1.51.0 | | [Extend Webhook autoscaler to send fleet metadata with the request](https://github.com/googleforgames/agones/issues/3951) | `FleetAutoscaleRequestMetaData` | Disabled | `Alpha` | 1.48.0 | | [Sidecar Containers](https://github.com/googleforgames/agones/issues/3642) | `SidecarContainers` | Disabled | `Alpha` | 1.49.0 | +| [Allocator Batches Updates](https://github.com/googleforgames/agones/issues/3992) | `AllocatorBatchesUpdates` | Disabled | `Alpha` | 1.50.0 | | Example Gate (not in use) | `Example` | Disabled | None | 0.13.0 | [fleet-updates]: {{% relref "./fleet-updates.md#notifying-gameservers-on-fleet-updatedownscale" %}} diff --git a/site/content/en/docs/Guides/metrics.md b/site/content/en/docs/Guides/metrics.md index c2564b5f67..841f7ad6e0 100644 --- a/site/content/en/docs/Guides/metrics.md +++ b/site/content/en/docs/Guides/metrics.md @@ -49,8 +49,10 @@ Follow the [Google Cloud Monitoring installation steps](#google-cloud-monitoring |-------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------| | agones_gameservers_count | The number of gameservers per fleet and status | gauge | | agones_gameserver_allocations_duration_seconds | The distribution of gameserver allocation requests latencies | histogram | -| agones_gameserver_allocations_retry_total | The count of gameserver allocation retry until it succeeds | histogram | -| agones_gameserver_creation_duration | The time gameserver takes to be created in seconds | histogram | +| agones_gameserver_allocations_retry_total | The count of gameserver allocation retry until it succeeds | histogram | +| agones_gameserver_allocations_updates_duration_seconds| The distribution of gameserver allocation update requests latencies | histogram | +| agones_gameserver_allocations_batch_size | The count of gameserver allocations in a batch | histogram | +| agones_gameserver_creation_duration | The time gameserver takes to be created in seconds | histogram | | agones_gameservers_total | The total of gameservers per fleet and status | counter | | agones_gameserver_player_connected_total | The total number of players connected to gameservers (Only available when [player tracking]({{< relref "player-tracking.md" >}}) is enabled) | gauge | | agones_gameserver_player_capacity_total | The available capacity for players on gameservers (Only available when [player tracking]({{< relref "player-tracking.md" >}}) is enabled) | gauge | diff --git a/test/upgrade/versionMap.yaml b/test/upgrade/versionMap.yaml index 5647d35503..5d88f23f4b 100644 --- a/test/upgrade/versionMap.yaml +++ b/test/upgrade/versionMap.yaml @@ -104,7 +104,7 @@ data: "betaGates": ["AutopilotPassthroughPort", "CountsAndLists", "GKEAutopilotExtendedDurationPods", "PortPolicyNone", "PortRanges", "RollingUpdateFix", "ScheduledAutoscaler"] }, "Dev": { - "alphaGates": ["FleetAutoscaleRequestMetaData", "PlayerAllocationFilter", "PlayerTracking", "SidecarContainers"], + "alphaGates": ["AllocatorBatchesUpdates", "FleetAutoscaleRequestMetaData", "PlayerAllocationFilter", "PlayerTracking", "SidecarContainers"], "betaGates": ["AutopilotPassthroughPort", "CountsAndLists", "GKEAutopilotExtendedDurationPods", "PortPolicyNone", "PortRanges", "RollingUpdateFix", "ScheduledAutoscaler"] } }