From 6ec6aa6284b66d30021b2187715007791e26fd97 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Tue, 2 Sep 2025 12:48:12 -0400 Subject: [PATCH] feat: implement the processor server feat: add metrics setup for processor feat: remove exit from sigterm and let it shutdown gracefully feat: fix conflicts feat: update logs feat: test pipeline with ProcessorAllocator in alpha feat: test pipeline with ProcessorAllocator in alpha feat: debug e2e failing feat: debug e2e failing feat: fix error handling and feature gate from dep env feat: fix extension and allocator with processor server feat: update unit test feat: add missing status feat: fix missing namespace feat: refactor a bit the error handling feat: update copyright to 2026 feat: minor changes feat: rollback unit test fix (other PR) feat: fix rebase issue feat: fix unit test from rebase --- build/Makefile | 4 +- cloudbuild.yaml | 2 +- cmd/allocator/main.go | 4 +- cmd/processor/handler.go | 347 +++++++++++++++++++ cmd/processor/main.go | 283 ++++++++++++--- install/helm/agones/templates/processor.yaml | 39 ++- install/helm/agones/values.yaml | 2 + pkg/gameserverallocations/controller.go | 67 ++-- 8 files changed, 665 insertions(+), 83 deletions(-) create mode 100644 cmd/processor/handler.go diff --git a/build/Makefile b/build/Makefile index c697b81977..29d7bffd1a 100644 --- a/build/Makefile +++ b/build/Makefile @@ -73,7 +73,7 @@ BETA_FEATURE_GATES ?= "CountsAndLists=true&GKEAutopilotExtendedDurationPods=true # Enable all alpha feature gates. Keep in sync with `false` (alpha) entries in pkg/util/runtime/features.go:featureDefaults -ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true" +ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" # Build with Windows support WITH_WINDOWS=1 @@ -798,7 +798,7 @@ build-processor-image-arm64: $(ensure-build-image) build-processor-binary create # Build the debug image for the processor service build-processor-debug-image: $(ensure-build-image) build-processor-debug-binary build-licenses build-required-src-dist - docker build $(agones_path)/cmd/processor/ --file $(agones_path)/cmd/processor/Dockerfile.debug --tag=$(processor_tag) $(DOCKER_BUILD_ARGS) + docker build $(agones_path)/cmd/processor/ --file $(agones_path)/cmd/processor/Dockerfile.debug --tag=$(processor_amd64_tag) $(DOCKER_BUILD_ARGS) # Pushes up the processor image push-processor-image: push-processor-image-amd64 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index f4fcca1817..af2361d238 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -322,7 +322,7 @@ steps: # Keep in sync with the inverse of 'alpha' and 'beta' features in # pkg/util/runtime/features.go:featureDefaults - featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true" + featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" featureWithoutGate="" # Use this if specific feature gates can only be supported on specific Kubernetes versions. diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 1fc0678799..025496c92c 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -737,9 +737,7 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) gsa.ApplyDefaults() if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { - req := converters.ConvertGSAToAllocationRequest(gsa) - - resp, err := h.processorClient.Allocate(ctx, req) + resp, err := h.processorClient.Allocate(ctx, in) if err != nil { logger.WithField("gsa", gsa).WithError(err).Error("allocation failed") return nil, err diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go new file mode 100644 index 0000000000..22dcc9e39f --- /dev/null +++ b/cmd/processor/handler.go @@ -0,0 +1,347 @@ +// Copyright 2026 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Processor +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "agones.dev/agones/pkg/allocation/converters" + allocationpb "agones.dev/agones/pkg/allocation/go" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/gameserverallocations" + "agones.dev/agones/pkg/gameservers" + + "github.com/heptiolabs/healthcheck" + "github.com/sirupsen/logrus" + "go.opencensus.io/plugin/ocgrpc" + rpcstatus "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +// allocationResult represents the result of an allocation attempt +type allocationResult struct { + response *allocationpb.AllocationResponse + error *rpcstatus.Status +} + +// processorHandler represents the gRPC server for processing allocation requests +type processorHandler struct { + allocationpb.UnimplementedProcessorServer + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + allocator *gameserverallocations.Allocator + clients map[string]allocationpb.Processor_StreamBatchesServer + grpcUnallocatedStatusCode codes.Code + pullInterval time.Duration +} + +// newServiceHandler creates a new instance of processorHandler +func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, + health healthcheck.Handler, config processorConfig, grpcUnallocatedStatusCode codes.Code) *processorHandler { + defaultResync := 30 * time.Second + agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) + kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) + gsCounter := gameservers.NewPerNodeCounter(kubeInformerFactory, agonesInformerFactory) + + allocator := gameserverallocations.NewAllocator( + agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(), + kubeInformerFactory.Core().V1().Secrets(), + agonesClient.AgonesV1(), + kubeClient, + gameserverallocations.NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), gsCounter, health), + config.RemoteAllocationTimeout, + config.TotalRemoteAllocationTimeout, + config.AllocationBatchWaitTime) + + batchCtx, cancel := context.WithCancel(ctx) + h := processorHandler{ + allocator: allocator, + ctx: batchCtx, + cancel: cancel, + grpcUnallocatedStatusCode: grpcUnallocatedStatusCode, + pullInterval: config.PullInterval, + } + + kubeInformerFactory.Start(ctx.Done()) + agonesInformerFactory.Start(ctx.Done()) + + if err := allocator.Run(ctx); err != nil { + logger.WithError(err).Fatal("Starting allocator failed.") + } + + return &h +} + +// StreamBatches handles a bidirectional stream for batch allocation requests from a client +// Registers the client, processes incoming batches, and sends responses +func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBatchesServer) error { + var clientID string + + // Wait for first message to get clientID + msg, err := stream.Recv() + if err != nil { + logger.WithError(err).Debug("Stream receive error on connect") + return err + } + + clientID = msg.GetClientId() + if clientID == "" { + logger.Warn("Received empty clientID, closing stream") + return nil + } + + h.addClient(clientID, stream) + defer h.removeClient(clientID) + logger.WithField("clientID", clientID).Debug("Client registered") + + // Main loop: handle incoming messages + for { + msg, err := stream.Recv() + if err != nil { + logger.WithError(err).Debug("Stream receive error") + return err + } + + payload := msg.GetPayload() + if payload == nil { + logger.WithField("clientID", clientID).Warn("Received message with nil payload") + continue + } + + batchPayload, ok := payload.(*allocationpb.ProcessorMessage_BatchRequest) + if !ok { + logger.WithField("clientID", clientID).Warn("Received non-batch request payload") + continue + } + + batchRequest := batchPayload.BatchRequest + batchID := batchRequest.GetBatchId() + requestWrappers := batchRequest.GetRequests() + + logger.WithFields(logrus.Fields{ + "clientID": clientID, + "batchID": batchID, + "requestCount": len(requestWrappers), + }).Debug("Received batch request") + + // Extract request IDs for logging + requestIDs := make([]string, len(requestWrappers)) + for i, wrapper := range requestWrappers { + requestIDs[i] = wrapper.GetRequestId() + } + + // Submit batch for processing + response := h.submitBatch(batchID, requestWrappers) + + respMsg := &allocationpb.ProcessorMessage{ + ClientId: clientID, + Payload: &allocationpb.ProcessorMessage_BatchResponse{ + BatchResponse: response, + }, + } + + // TODO: we might want to retry on failure here ? + if err := stream.Send(respMsg); err != nil { + logger.WithFields(logrus.Fields{ + "clientID": clientID, + "batchID": batchID, + "requestCount": len(requestWrappers), + }).WithError(err).Error("Failed to send response") + continue + } + } +} + +// StartPullRequestTicker periodically sends pull requests to all connected clients +func (h *processorHandler) StartPullRequestTicker() { + go func() { + ticker := time.NewTicker(h.pullInterval) + defer ticker.Stop() + + for { + select { + case <-h.ctx.Done(): + return + case <-ticker.C: + h.mu.RLock() + for clientID, stream := range h.clients { + pullMsg := &allocationpb.ProcessorMessage{ + ClientId: clientID, + Payload: &allocationpb.ProcessorMessage_Pull{ + Pull: &allocationpb.PullRequest{Message: "pull"}, + }, + } + go func(id string, s allocationpb.Processor_StreamBatchesServer) { + if err := s.Send(pullMsg); err != nil { + logger.WithFields(logrus.Fields{ + "clientID": id, + "error": err, + }).Warn("Failed to send pull request, removing client") + h.removeClient(id) + } + }(clientID, stream) + } + h.mu.RUnlock() + } + } + }() +} + +// processAllocationsConcurrently processes multiple allocation requests in parallel +func (h *processorHandler) processAllocationsConcurrently(requestWrappers []*allocationpb.RequestWrapper) []allocationResult { + var wg sync.WaitGroup + results := make([]allocationResult, len(requestWrappers)) + + for i, reqWrapper := range requestWrappers { + wg.Add(1) + go func(index int, requestWrapper *allocationpb.RequestWrapper) { + defer wg.Done() + results[index] = h.processAllocation(requestWrapper.Request) + }(i, reqWrapper) + } + + wg.Wait() + + return results +} + +// processAllocation handles a single allocation request by using the allocator +func (h *processorHandler) processAllocation(req *allocationpb.AllocationRequest) allocationResult { + gsa := converters.ConvertAllocationRequestToGSA(req) + gsa.ApplyDefaults() + + makeError := func(err error, fallbackCode codes.Code) allocationResult { + code := fallbackCode + msg := err.Error() + if grpcStatus, ok := status.FromError(err); ok { + code = grpcStatus.Code() + msg = grpcStatus.Message() + } + return allocationResult{ + error: &rpcstatus.Status{Code: int32(code), Message: msg}, + } + } + + resultObj, err := h.allocator.Allocate(h.ctx, gsa) + if err != nil { + return makeError(err, h.grpcUnallocatedStatusCode) + } + + if s, ok := resultObj.(*metav1.Status); ok { + return allocationResult{ + error: &rpcstatus.Status{ + Code: int32(grpcCodeFromHTTPStatus(int(s.Code))), + Message: s.Message, + }, + } + } + + allocatedGsa, ok := resultObj.(*allocationv1.GameServerAllocation) + if !ok { + return allocationResult{ + error: &rpcstatus.Status{ + Code: int32(codes.Internal), + Message: fmt.Sprintf("internal server error - Bad GSA format %v", resultObj), + }, + } + } + + response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode) + if err != nil { + return makeError(err, h.grpcUnallocatedStatusCode) + } + + return allocationResult{response: response} +} + +// submitBatch accepts a batch of allocation requests, processes them, and assembles a batch response +func (h *processorHandler) submitBatch(batchID string, requestWrappers []*allocationpb.RequestWrapper) *allocationpb.BatchResponse { + results := h.processAllocationsConcurrently(requestWrappers) + responseWrappers := make([]*allocationpb.ResponseWrapper, len(requestWrappers)) + + for i, result := range results { + wrapper := &allocationpb.ResponseWrapper{ + RequestId: requestWrappers[i].RequestId, + } + + if result.error != nil { + wrapper.Result = &allocationpb.ResponseWrapper_Error{ + Error: result.error, + } + } else { + wrapper.Result = &allocationpb.ResponseWrapper_Response{ + Response: result.response, + } + } + responseWrappers[i] = wrapper + } + + return &allocationpb.BatchResponse{ + BatchId: batchID, + Responses: responseWrappers, + } +} + +// getGRPCServerOptions returns a list of GRPC server options to use when only serving gRPC requests. +func (h *processorHandler) getGRPCServerOptions() []grpc.ServerOption { + opts := []grpc.ServerOption{ + grpc.StatsHandler(&ocgrpc.ServerHandler{}), + + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: 1 * time.Minute, + Timeout: 30 * time.Second, + Time: 30 * time.Second, + }), + } + + return opts +} + +// addClient registers a new client for streaming allocation responses +func (h *processorHandler) addClient(clientID string, stream allocationpb.Processor_StreamBatchesServer) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.clients == nil { + h.clients = make(map[string]allocationpb.Processor_StreamBatchesServer) + } + + h.clients[clientID] = stream +} + +// removeClient unregisters a client from streaming allocation responses +func (h *processorHandler) removeClient(clientID string) { + h.mu.Lock() + defer h.mu.Unlock() + + delete(h.clients, clientID) +} diff --git a/cmd/processor/main.go b/cmd/processor/main.go index 03c603763c..bd7de4646c 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -17,11 +17,18 @@ package main import ( "context" + "errors" + "fmt" + "net" + "net/http" "os" "strings" "time" "agones.dev/agones/pkg" + allocationpb "agones.dev/agones/pkg/allocation/go" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/metrics" "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" @@ -31,6 +38,10 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpchealth "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -39,12 +50,24 @@ import ( ) const ( - logLevelFlag = "log-level" - leaderElectionFlag = "leader-election" - podNamespace = "pod-namespace" - leaseDurationFlag = "lease-duration" - renewDeadlineFlag = "renew-deadline" - retryPeriodFlag = "retry-period" + allocationBatchWaitTime = "allocation-batch-wait-time" + apiServerBurstQPSFlag = "api-server-qps-burst" + apiServerSustainedQPSFlag = "api-server-qps" + enablePrometheusMetricsFlag = "prometheus-exporter" + enableStackdriverMetricsFlag = "stackdriver-exporter" + grpcPortFlag = "grpc-port" + httpUnallocatedStatusCode = "http-unallocated-status-code" + leaderElectionFlag = "leader-election" + leaseDurationFlag = "lease-duration" + logLevelFlag = "log-level" + podNamespace = "pod-namespace" + projectIDFlag = "gcp-project-id" + pullIntervalFlag = "pull-interval" + remoteAllocationTimeoutFlag = "remote-allocation-timeout" + renewDeadlineFlag = "renew-deadline" + retryPeriodFlag = "retry-period" + stackdriverLabels = "stackdriver-labels" + totalRemoteAllocationTimeoutFlag = "total-remote-allocation-timeout" ) var ( @@ -52,41 +75,108 @@ var ( ) type processorConfig struct { - LogLevel string - LeaderElection bool - PodNamespace string - LeaseDuration time.Duration - RenewDeadline time.Duration - RetryPeriod time.Duration + GCPProjectID string + LogLevel string + PodNamespace string + StackdriverLabels string + APIServerBurstQPS int + APIServerSustainedQPS int + GRPCPort int + HTTPUnallocatedStatusCode int + LeaderElection bool + PrometheusMetrics bool + Stackdriver bool + AllocationBatchWaitTime time.Duration + LeaseDuration time.Duration + PullInterval time.Duration + RenewDeadline time.Duration + RemoteAllocationTimeout time.Duration + RetryPeriod time.Duration + TotalRemoteAllocationTimeout time.Duration } func parseEnvFlags() processorConfig { - viper.SetDefault(logLevelFlag, "Info") + viper.SetDefault(allocationBatchWaitTime, 50*time.Millisecond) + viper.SetDefault(apiServerBurstQPSFlag, 500) + viper.SetDefault(apiServerSustainedQPSFlag, 400) + viper.SetDefault(enablePrometheusMetricsFlag, true) + viper.SetDefault(enableStackdriverMetricsFlag, false) + viper.SetDefault(grpcPortFlag, 9090) + viper.SetDefault(httpUnallocatedStatusCode, http.StatusTooManyRequests) viper.SetDefault(leaderElectionFlag, false) - viper.SetDefault(podNamespace, "") viper.SetDefault(leaseDurationFlag, 15*time.Second) + viper.SetDefault(logLevelFlag, "Info") + viper.SetDefault(podNamespace, "") + viper.SetDefault(projectIDFlag, "") + viper.SetDefault(pullIntervalFlag, 200*time.Millisecond) + viper.SetDefault(remoteAllocationTimeoutFlag, 10*time.Second) viper.SetDefault(renewDeadlineFlag, 10*time.Second) viper.SetDefault(retryPeriodFlag, 2*time.Second) + viper.SetDefault(stackdriverLabels, "") + viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second) - pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Log level") + pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches") + pflag.Int32(apiServerBurstQPSFlag, viper.GetInt32(apiServerBurstQPSFlag), "Maximum burst queries per second to send to the API server") + pflag.Int32(apiServerSustainedQPSFlag, viper.GetInt32(apiServerSustainedQPSFlag), "Maximum sustained queries per second to send to the API server") + pflag.Bool(enablePrometheusMetricsFlag, viper.GetBool(enablePrometheusMetricsFlag), "Flag to activate metrics of Agones. Can also use PROMETHEUS_EXPORTER env variable.") + pflag.Bool(enableStackdriverMetricsFlag, viper.GetBool(enableStackdriverMetricsFlag), "Flag to activate stackdriver monitoring metrics for Agones. Can also use STACKDRIVER_EXPORTER env variable.") + pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests") + pflag.Int32(httpUnallocatedStatusCode, viper.GetInt32(httpUnallocatedStatusCode), "HTTP status code to return when no GameServer is available") pflag.Bool(leaderElectionFlag, viper.GetBool(leaderElectionFlag), "Enable leader election") - pflag.String(podNamespace, viper.GetString(podNamespace), "Pod namespace") pflag.Duration(leaseDurationFlag, viper.GetDuration(leaseDurationFlag), "Leader election lease duration") + pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Log level") + pflag.String(podNamespace, viper.GetString(podNamespace), "Pod namespace") + pflag.String(projectIDFlag, viper.GetString(projectIDFlag), "GCP ProjectID used for Stackdriver, if not specified ProjectID from Application Default Credentials would be used. Can also use GCP_PROJECT_ID env variable.") + pflag.Duration(pullIntervalFlag, viper.GetDuration(pullIntervalFlag), "Interval between pull requests sent to processor clients") + pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.") pflag.Duration(renewDeadlineFlag, viper.GetDuration(renewDeadlineFlag), "Leader election renew deadline") pflag.Duration(retryPeriodFlag, viper.GetDuration(retryPeriodFlag), "Leader election retry period") + pflag.String(stackdriverLabels, viper.GetString(stackdriverLabels), "A set of default labels to add to all stackdriver metrics generated. By default metadata are automatically added using Kubernetes API and GCP metadata enpoint.") + pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.") pflag.Parse() viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) - viper.AutomaticEnv() - _ = viper.BindPFlags(pflag.CommandLine) + runtime.Must(viper.BindEnv(allocationBatchWaitTime)) + runtime.Must(viper.BindEnv(apiServerBurstQPSFlag)) + runtime.Must(viper.BindEnv(apiServerSustainedQPSFlag)) + runtime.Must(viper.BindEnv(enablePrometheusMetricsFlag)) + runtime.Must(viper.BindEnv(enableStackdriverMetricsFlag)) + runtime.Must(viper.BindEnv(grpcPortFlag)) + runtime.Must(viper.BindEnv(httpUnallocatedStatusCode)) + runtime.Must(viper.BindEnv(leaderElectionFlag)) + runtime.Must(viper.BindEnv(leaseDurationFlag)) + runtime.Must(viper.BindEnv(logLevelFlag)) + runtime.Must(viper.BindEnv(podNamespace)) + runtime.Must(viper.BindEnv(projectIDFlag)) + runtime.Must(viper.BindEnv(pullIntervalFlag)) + runtime.Must(viper.BindEnv(remoteAllocationTimeoutFlag)) + runtime.Must(viper.BindEnv(renewDeadlineFlag)) + runtime.Must(viper.BindEnv(retryPeriodFlag)) + runtime.Must(viper.BindEnv(stackdriverLabels)) + runtime.Must(viper.BindEnv(totalRemoteAllocationTimeoutFlag)) + runtime.Must(runtime.FeaturesBindEnv()) + + runtime.Must(runtime.ParseFeaturesFromEnv()) return processorConfig{ - LogLevel: viper.GetString(logLevelFlag), - LeaderElection: viper.GetBool(leaderElectionFlag), - PodNamespace: viper.GetString(podNamespace), - LeaseDuration: viper.GetDuration(leaseDurationFlag), - RenewDeadline: viper.GetDuration(renewDeadlineFlag), - RetryPeriod: viper.GetDuration(retryPeriodFlag), + AllocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime), + APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)), + APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)), + GCPProjectID: viper.GetString(projectIDFlag), + GRPCPort: int(viper.GetInt32(grpcPortFlag)), + HTTPUnallocatedStatusCode: int(viper.GetInt32(httpUnallocatedStatusCode)), + LeaderElection: viper.GetBool(leaderElectionFlag), + LeaseDuration: viper.GetDuration(leaseDurationFlag), + LogLevel: viper.GetString(logLevelFlag), + PodNamespace: viper.GetString(podNamespace), + PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag), + PullInterval: viper.GetDuration(pullIntervalFlag), + RenewDeadline: viper.GetDuration(renewDeadlineFlag), + RemoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag), + RetryPeriod: viper.GetDuration(retryPeriodFlag), + Stackdriver: viper.GetBool(enableStackdriverMetricsFlag), + StackdriverLabels: viper.GetString(stackdriverLabels), + TotalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag), } } @@ -110,47 +200,46 @@ func main() { } healthserver := &httpserver.Server{Logger: logger} - health := healthcheck.NewHandler() + var health healthcheck.Handler - config, err := rest.InClusterConfig() - if err != nil { - logger.WithError(err).Fatal("Failed to create in-cluster config") - panic("Failed to create in-cluster config: " + err.Error()) + metricsConf := metrics.Config{ + Stackdriver: conf.Stackdriver, + PrometheusMetrics: conf.PrometheusMetrics, + GCPProjectID: conf.GCPProjectID, + StackdriverLabels: conf.StackdriverLabels, } - kubeClient, err := kubernetes.NewForConfig(config) + health, closer := metrics.SetupMetrics(metricsConf, healthserver) + defer closer() + + metrics.SetReportingPeriod(conf.PrometheusMetrics, conf.Stackdriver) + + kubeClient, agonesClient, err := getClients(conf) if err != nil { - logger.WithError(err).Fatal("Failed to create Kubernetes client") - panic("Failed to create Kubernetes client: " + err.Error()) + logger.WithError(err).Fatal("Could not create clients") } + grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.HTTPUnallocatedStatusCode) + processorService := newServiceHandler(ctx, kubeClient, agonesClient, health, conf, grpcUnallocatedStatusCode) + + grpcHealth := grpchealth.NewServer() + grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + runGRPC(ctx, processorService, grpcHealth, conf.GRPCPort) + go func() { healthserver.Handle("/", health) - _ = healthserver.Run(context.Background(), 0) + _ = healthserver.Run(ctx, 0) }() signals.NewSigTermHandler(func() { logger.Info("Pod shutdown has been requested, failing readiness check") + grpcHealth.Shutdown() cancelCtx() - os.Exit(0) }) - whenLeader(ctx, cancelCtx, logger, conf, kubeClient, func(ctx context.Context) { + whenLeader(ctx, cancelCtx, logger, conf, kubeClient, func(_ context.Context) { logger.Info("Starting processor work as leader") - - // Simulate processor work (to ensure the leader is working) - // TODO: implement processor work - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - logger.Info("Processor work completed") - return - case <-ticker.C: - logger.Info("Processor is active as leader") - } - } + grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_SERVING) + processorService.StartPullRequestTicker() }) logger.Info("Processor exited gracefully.") @@ -158,13 +247,16 @@ func main() { func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, conf processorConfig, kubeClient *kubernetes.Clientset, start func(_ context.Context)) { + logger.WithField("leaderElectionEnabled", conf.LeaderElection).Info("Leader election configuration") + if !conf.LeaderElection { start(ctx) + <-ctx.Done() + return } id := uuid.New().String() - lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: "agones-processor-lock", @@ -206,3 +298,92 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E }, }) } + +func runGRPC(ctx context.Context, h *processorHandler, grpcHealth *grpchealth.Server, grpcPort int) { + logger.WithField("port", grpcPort).Info("Running the grpc handler on port") + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) + if err != nil { + logger.WithError(err).Fatalf("Failed to listen on TCP port %d", grpcPort) + os.Exit(1) + } + + grpcServer := grpc.NewServer(h.getGRPCServerOptions()...) + allocationpb.RegisterProcessorServer(grpcServer, h) + grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth) + + go func() { + go func() { + <-ctx.Done() + grpcServer.GracefulStop() + }() + + err := grpcServer.Serve(listener) + if err != nil { + logger.WithError(err).Fatal("Allocation service crashed") + os.Exit(1) + } + logger.Info("Allocation server closed") + os.Exit(0) + + }() +} + +// Set up our client which we will use to call the API +func getClients(ctlConfig processorConfig) (*kubernetes.Clientset, *versioned.Clientset, error) { + // Create the in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + return nil, nil, errors.New("Could not create in cluster config") + } + + config.QPS = float32(ctlConfig.APIServerSustainedQPS) + config.Burst = ctlConfig.APIServerBurstQPS + + // Access to the Agones resources through the Agones Clientset + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, nil, errors.New("Could not create the kubernetes api clientset") + } + + // Access to the Agones resources through the Agones Clientset + agonesClient, err := versioned.NewForConfig(config) + if err != nil { + return nil, nil, errors.New("Could not create the agones api clientset") + } + return kubeClient, agonesClient, nil +} + +// grpcCodeFromHTTPStatus converts an HTTP status code to the corresponding gRPC status code. +func grpcCodeFromHTTPStatus(httpUnallocatedStatusCode int) codes.Code { + switch httpUnallocatedStatusCode { + case http.StatusOK: + return codes.OK + case 499: + return codes.Canceled + case http.StatusInternalServerError: + return codes.Internal + case http.StatusBadRequest: + return codes.InvalidArgument + case http.StatusGatewayTimeout: + return codes.DeadlineExceeded + case http.StatusNotFound: + return codes.NotFound + case http.StatusConflict: + return codes.AlreadyExists + case http.StatusForbidden: + return codes.PermissionDenied + case http.StatusUnauthorized: + return codes.Unauthenticated + case http.StatusTooManyRequests: + return codes.ResourceExhausted + case http.StatusNotImplemented: + return codes.Unimplemented + case http.StatusUnprocessableEntity: + return codes.InvalidArgument + case http.StatusServiceUnavailable: + return codes.Unavailable + default: + logger.WithField("httpStatusCode", httpUnallocatedStatusCode).Warnf("Received unknown http status code, defaulting to codes.ResourceExhausted / 429") + return codes.ResourceExhausted + } +} diff --git a/install/helm/agones/templates/processor.yaml b/install/helm/agones/templates/processor.yaml index 5efe43284a..96673205d5 100644 --- a/install/helm/agones/templates/processor.yaml +++ b/install/helm/agones/templates/processor.yaml @@ -113,13 +113,39 @@ spec: {{- if gt (int .Values.agones.allocator.processor.replicas) 1 }} - name: LEADER_ELECTION value: "true" + {{- end }} - name: LEASE_DURATION value: {{ .Values.agones.allocator.processor.leaderElection.leaseDuration | default "15s" | quote }} - name: RENEW_DEADLINE value: {{ .Values.agones.allocator.processor.leaderElection.renewDeadline | default "10s" | quote }} - name: RETRY_PERIOD value: {{ .Values.agones.allocator.processor.leaderElection.retryPeriod | default "2s" | quote }} - {{- end }} + - name: PULL_INTERVAL + value: {{ .Values.agones.allocator.processor.pullInterval | default "200ms" | quote }} + - name: GRPC_PORT + value: {{ .Values.agones.allocator.processor.grpc.port | quote }} + - name: HTTP_UNALLOCATED_STATUS_CODE + value: {{ .Values.agones.allocator.service.http.unallocatedStatusCode | quote }} + - name: API_SERVER_QPS + value: {{ .Values.agones.allocator.apiServerQPS | quote }} + - name: API_SERVER_QPS_BURST + value: {{ .Values.agones.allocator.apiServerQPSBurst | quote }} + - name: FEATURE_GATES + value: {{ .Values.agones.featureGates | quote }} + - name: PROMETHEUS_EXPORTER + value: {{ .Values.agones.metrics.prometheusEnabled | quote }} + - name: STACKDRIVER_EXPORTER + value: {{ .Values.agones.metrics.stackdriverEnabled | quote }} + - name: GCP_PROJECT_ID + value: {{ .Values.agones.metrics.stackdriverProjectID | quote }} + - name: STACKDRIVER_LABELS + value: {{ .Values.agones.metrics.stackdriverLabels | quote }} + - name: REMOTE_ALLOCATION_TIMEOUT + value: {{ .Values.agones.allocator.remoteAllocationTimeout | quote }} + - name: TOTAL_REMOTE_ALLOCATION_TIMEOUT + value: {{ .Values.agones.allocator.totalRemoteAllocationTimeout | quote }} + - name: ALLOCATION_BATCH_WAIT_TIME + value: {{ .Values.agones.allocator.processor.allocationBatchWaitTime | quote }} livenessProbe: httpGet: path: /live @@ -138,16 +164,18 @@ spec: ports: - name: http containerPort: {{ .Values.agones.allocator.processor.http.port }} + - name: grpc + containerPort: {{ .Values.agones.allocator.processor.grpc.port }} resources: {{- if .Values.agones.allocator.processor.resources }} {{ toYaml .Values.agones.allocator.processor.resources | indent 10 }} {{- else}} requests: - cpu: 10m - memory: 32Mi + cpu: 300m + memory: 256Mi limits: cpu: 500m - memory: 256Mi + memory: 512Mi {{- end }} {{- if .Values.agones.image.processor.pullSecret }} imagePullSecrets: @@ -175,4 +203,7 @@ spec: - port: {{ .Values.agones.allocator.processor.http.port }} name: http targetPort: {{ .Values.agones.allocator.processor.http.port }} + - port: {{ .Values.agones.allocator.processor.grpc.port }} + name: grpc + targetPort: {{ .Values.agones.allocator.processor.grpc.port }} {{- end }} \ No newline at end of file diff --git a/install/helm/agones/values.yaml b/install/helm/agones/values.yaml index 8639af359f..0b1dc4575e 100644 --- a/install/helm/agones/values.yaml +++ b/install/helm/agones/values.yaml @@ -292,6 +292,8 @@ agones: processor: replicas: 2 maxBatchSize: 100 + pullInterval: 200ms + allocationBatchWaitTime: 50ms resources: {} nodeSelector: {} annotations: {} diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index 24d5848004..83c662e4a5 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -25,6 +25,7 @@ import ( "github.com/heptiolabs/healthcheck" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" "agones.dev/agones/pkg/allocation/converters" + pb "agones.dev/agones/pkg/allocation/go" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/client/clientset/versioned" "agones.dev/agones/pkg/client/informers/externalversions" @@ -163,33 +165,15 @@ func (c *Extensions) processAllocationRequest(ctx context.Context, w http.Respon } if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { - var result k8sruntime.Object - var code int - req := converters.ConvertGSAToAllocationRequest(gsa) resp, err := c.processorClient.Allocate(ctx, req) if err != nil { - if st, ok := status.FromError(err); ok { - code = gwruntime.HTTPStatusFromCode(st.Code()) - } else { - code = http.StatusInternalServerError - } - - result = &metav1.Status{ - TypeMeta: metav1.TypeMeta{ - Kind: "Status", - APIVersion: "v1", - }, - Status: metav1.StatusFailure, - Message: err.Error(), - Code: int32(code), - } - } else { - result = converters.ConvertAllocationResponseToGSA(resp, resp.Source) - code = http.StatusCreated + result, code := c.convertProcessorError(err, gsa) + return c.serialisation(r, w, result, code, scheme.Codecs) } - return c.serialisation(r, w, result, code, scheme.Codecs) + result := c.convertProcessorResponse(resp, gsa) + return c.serialisation(r, w, result, http.StatusCreated, scheme.Codecs) } result, err := c.allocator.Allocate(ctx, gsa) @@ -266,3 +250,42 @@ func (c *Extensions) serialisation(r *http.Request, w http.ResponseWriter, obj k err = info.Serializer.Encode(obj, w) return errors.Wrapf(err, "error encoding %T", obj) } + +// convertProcessorError handles processor client errors and converts them to appropriate responses +func (c *Extensions) convertProcessorError(err error, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, int) { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.ResourceExhausted: + gsa.Status.State = allocationv1.GameServerAllocationUnAllocated + return gsa, http.StatusCreated + case codes.Aborted: + gsa.Status.State = allocationv1.GameServerAllocationContention + return gsa, http.StatusCreated + default: + code := gwruntime.HTTPStatusFromCode(st.Code()) + return &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Message: err.Error(), + Code: int32(code), + }, code + } + } + + return &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Message: err.Error(), + Code: int32(http.StatusInternalServerError), + }, http.StatusInternalServerError +} + +// convertProcessorResponse handles successful processor responses +func (c *Extensions) convertProcessorResponse(resp *pb.AllocationResponse, originalGSA *allocationv1.GameServerAllocation) k8sruntime.Object { + resultGSA := converters.ConvertAllocationResponseToGSA(resp, resp.Source) + resultGSA.Spec = originalGSA.Spec + resultGSA.ObjectMeta.Namespace = originalGSA.ObjectMeta.Namespace + resultGSA.ObjectMeta.Name = resp.GameServerName + + return resultGSA +}