diff --git a/gateway/gateway-controller/cmd/controller/main.go b/gateway/gateway-controller/cmd/controller/main.go index 84f9ca684..5ebca1a72 100644 --- a/gateway/gateway-controller/cmd/controller/main.go +++ b/gateway/gateway-controller/cmd/controller/main.go @@ -272,7 +272,7 @@ func main() { validator.SetPolicyValidator(policyValidator) // Initialize and start control plane client with dependencies for API creation - cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, validator, &cfg.GatewayController.Router) + cpClient := controlplane.NewClient(cfg.GatewayController.ControlPlane, log, configStore, db, snapshotManager, policyManager, validator, &cfg.GatewayController.Router) if err := cpClient.Start(); err != nil { log.Error("Failed to start control plane client", zap.Error(err)) // Don't fail startup - gateway can run in degraded mode without control plane diff --git a/gateway/gateway-controller/pkg/api/handlers/handlers.go b/gateway/gateway-controller/pkg/api/handlers/handlers.go index 0634bd4ab..6828201c3 100644 --- a/gateway/gateway-controller/pkg/api/handlers/handlers.go +++ b/gateway/gateway-controller/pkg/api/handlers/handlers.go @@ -22,9 +22,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/wso2/api-platform/common/constants" - "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" - "io" "net/http" "sort" @@ -34,9 +31,11 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/wso2/api-platform/common/constants" commonmodels "github.com/wso2/api-platform/common/models" api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/middleware" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/controlplane" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" @@ -44,7 +43,6 @@ import ( "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" - policyenginev1 "github.com/wso2/api-platform/sdk/gateway/policyengine/v1" "go.uber.org/zap" ) @@ -83,7 +81,7 @@ func NewAPIServer( routerConfig *config.RouterConfig, apiKeyXDSManager *apikeyxds.APIKeyStateManager, ) *APIServer { - deploymentService := utils.NewAPIDeploymentService(store, db, snapshotManager, validator, routerConfig) + deploymentService := utils.NewAPIDeploymentService(store, db, snapshotManager, policyManager, validator, routerConfig) server := &APIServer{ store: store, db: db, @@ -225,30 +223,7 @@ func (s *APIServer) CreateAPI(c *gin.Context) { Id: stringPtr(result.StoredConfig.GetHandle()), CreatedAt: timePtr(result.StoredConfig.CreatedAt), }) - - // Build and add policy config derived from API configuration if policies are present - if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(result.StoredConfig) - if storedPolicy != nil { - if err := s.policyManager.AddPolicy(storedPolicy); err != nil { - log.Error("Failed to add derived policy configuration", zap.Error(err)) - } else { - log.Info("Derived policy configuration added", - zap.String("policy_id", storedPolicy.ID), - zap.Int("route_count", len(storedPolicy.Configuration.Routes))) - } - } else if result.IsUpdate { - // API was updated and no longer has policies, remove the existing policy configuration - policyID := result.StoredConfig.ID + "-policies" - if err := s.policyManager.RemovePolicy(policyID); err != nil { - // Log at debug level since policy may not exist if API never had policies - log.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) - } else { - log.Info("Derived policy configuration removed (API no longer has policies)", - zap.String("policy_id", policyID)) - } - } - } + // Policy management is now handled by the deployment service } // ListAPIs implements ServerInterface.ListAPIs @@ -487,7 +462,6 @@ func (s *APIServer) GetAPIById(c *gin.Context, id string) { func (s *APIServer) UpdateAPI(c *gin.Context, id string) { // Get correlation-aware logger from context log := middleware.GetLogger(c, s.logger) - handle := id // Read request body body, err := io.ReadAll(c.Request.Body) @@ -500,249 +474,82 @@ func (s *APIServer) UpdateAPI(c *gin.Context, id string) { return } - // Parse configuration - contentType := c.GetHeader("Content-Type") - var apiConfig api.APIConfiguration - err = s.parser.Parse(body, contentType, &apiConfig) - if err != nil { - log.Error("Failed to parse configuration", zap.Error(err)) - c.JSON(http.StatusBadRequest, api.ErrorResponse{ - Status: "error", - Message: "Failed to parse configuration", - }) - return - } + // Get correlation ID from context + correlationID := middleware.GetCorrelationID(c) - // Validate that the handle in the YAML matches the path parameter - if apiConfig.Metadata.Name != "" { - if apiConfig.Metadata.Name != handle { - log.Warn("Handle mismatch between path and YAML metadata", - zap.String("path_handle", handle), - zap.String("yaml_handle", apiConfig.Metadata.Name)) + // Update API configuration using the utility service + result, err := s.deploymentService.UpdateAPIConfiguration(utils.APIUpdateParams{ + Handle: id, + Data: body, + ContentType: c.GetHeader("Content-Type"), + CorrelationID: correlationID, + Logger: log, + }) + + if err != nil { + // Map error types to HTTP status codes + switch e := err.(type) { + case *utils.NotFoundError: + c.JSON(http.StatusNotFound, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + case *utils.HandleMismatchError: c.JSON(http.StatusBadRequest, api.ErrorResponse{ Status: "error", - Message: fmt.Sprintf("Handle mismatch: path has '%s' but YAML metadata.name has '%s'", handle, apiConfig.Metadata.Name), + Message: e.Error(), }) - return - } - } - - // Validate configuration - validationErrors := s.validator.Validate(&apiConfig) - if len(validationErrors) > 0 { - log.Warn("Configuration validation failed", - zap.String("handle", handle), - zap.Int("num_errors", len(validationErrors))) - - errors := make([]api.ValidationError, len(validationErrors)) - for i, e := range validationErrors { - errors[i] = api.ValidationError{ - Field: stringPtr(e.Field), - Message: stringPtr(e.Message), - } - } - - c.JSON(http.StatusBadRequest, api.ErrorResponse{ - Status: "error", - Message: "Configuration validation failed", - Errors: &errors, - }) - return - } - - if s.db == nil { - c.JSON(http.StatusServiceUnavailable, api.ErrorResponse{ - Status: "error", - Message: "Database storage not available", - }) - return - } - - // Check if config exists - existing, err := s.db.GetConfigByHandle(handle) - if err != nil { - log.Warn("API configuration not found", - zap.String("handle", handle)) - c.JSON(http.StatusNotFound, api.ErrorResponse{ - Status: "error", - Message: fmt.Sprintf("API configuration with handle '%s' not found", handle), - }) - return - } - - // Update stored configuration - now := time.Now() - existing.Configuration = apiConfig - existing.Status = models.StatusPending - existing.UpdatedAt = now - existing.DeployedAt = nil - existing.DeployedVersion = 0 - - if apiConfig.Kind == api.Asyncwebsub { - topicsToRegister, topicsToUnregister := s.deploymentService.GetTopicsForUpdate(*existing) - // TODO: Pre configure the dynamic forward proxy rules for event gw - // This was communication bridge will be created on the gw startup - // Can perform internal communication with websub hub without relying on the dynamic rules - // Execute topic operations with wait group and errors tracking - var wg2 sync.WaitGroup - var regErrs int32 - var deregErrs int32 - - if len(topicsToRegister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - log.Info("Starting topic registration", zap.Int("total_topics", len(list)), zap.String("api_id", existing.ID)) - //fmt.Println("Topics Registering Started") - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.deploymentService.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, log); err != nil { - log.Error("Failed to register topic with WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - atomic.AddInt32(®Errs, 1) - } else { - log.Info("Successfully registered topic with WebSubHub", - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - } - }(topic) - } - childWg.Wait() - }(topicsToRegister) - } - - if len(topicsToUnregister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - log.Info("Starting topic deregistration", zap.Int("total_topics", len(list)), zap.String("api_id", existing.ID)) - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.deploymentService.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, log); err != nil { - log.Error("Failed to deregister topic from WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - atomic.AddInt32(&deregErrs, 1) - } else { - log.Info("Successfully deregistered topic from WebSubHub", - zap.String("topic", topic), - zap.String("api_id", existing.ID)) - } - }(topic) + case *utils.APIValidationError: + errors := make([]api.ValidationError, len(e.Errors)) + for i, ve := range e.Errors { + errors[i] = api.ValidationError{ + Field: stringPtr(ve.Field), + Message: stringPtr(ve.Message), } - childWg.Wait() - }(topicsToUnregister) - } - wg2.Wait() - - log.Info("Topic lifecycle operations completed", - zap.String("api_id", existing.ID), - zap.Int("registered", len(topicsToRegister)), - zap.Int("deregistered", len(topicsToUnregister)), - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - - // Check if topic operations failed and return error - if regErrs > 0 || deregErrs > 0 { - log.Error("Failed to register & deregister topics", zap.Error(err)) - c.JSON(http.StatusInternalServerError, api.ErrorResponse{ + } + c.JSON(http.StatusBadRequest, api.ErrorResponse{ Status: "error", - Message: "Topic lifecycle operations failed", + Message: "Configuration validation failed", + Errors: &errors, }) - return - } - } - - // Atomic dual-write: database + in-memory - // Update database first (only if persistent mode) - if s.db != nil { - if err := s.db.UpdateConfig(existing); err != nil { - log.Error("Failed to update config in database", zap.Error(err)) + case *utils.ParseError: + c.JSON(http.StatusBadRequest, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + case *utils.TopicOperationError: c.JSON(http.StatusInternalServerError, api.ErrorResponse{ Status: "error", - Message: "Failed to persist configuration update", + Message: e.Error(), }) - return - } - } - - if err := s.store.Update(existing); err != nil { - // Log conflict errors at info level, other errors at error level - if storage.IsConflictError(err) { - log.Info("API configuration handle already exists", - zap.String("id", existing.ID), - zap.String("handle", handle)) + case *utils.ConflictError: c.JSON(http.StatusConflict, api.ErrorResponse{ Status: "error", - Message: err.Error(), + Message: e.Error(), }) - } else { - log.Error("Failed to update config in memory store", zap.Error(err)) + case *utils.DatabaseUnavailableError: + c.JSON(http.StatusServiceUnavailable, api.ErrorResponse{ + Status: "error", + Message: e.Error(), + }) + default: + log.Error("Failed to update API configuration", zap.Error(err)) c.JSON(http.StatusInternalServerError, api.ErrorResponse{ Status: "error", - Message: "Failed to update configuration in memory store", + Message: err.Error(), }) } return } - // Get correlation ID from context - correlationID := middleware.GetCorrelationID(c) - - // Update xDS snapshot asynchronously - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if err := s.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { - log.Error("Failed to update xDS snapshot", zap.Error(err)) - } - }() - - log.Info("API configuration updated", - zap.String("id", existing.ID), - zap.String("handle", handle)) - // Return success response (id is the handle) c.JSON(http.StatusOK, api.APIUpdateResponse{ Status: stringPtr("success"), Message: stringPtr("API configuration updated successfully"), - Id: stringPtr(existing.GetHandle()), - UpdatedAt: timePtr(existing.UpdatedAt), + Id: stringPtr(result.StoredConfig.GetHandle()), + UpdatedAt: timePtr(result.StoredConfig.UpdatedAt), }) - - // Rebuild and update derived policy configuration - if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(existing) - if storedPolicy != nil { - if err := s.policyManager.AddPolicy(storedPolicy); err != nil { - log.Error("Failed to update derived policy configuration", zap.Error(err)) - } else { - log.Info("Derived policy configuration updated", - zap.String("policy_id", storedPolicy.ID), - zap.Int("route_count", len(storedPolicy.Configuration.Routes))) - } - } else { - // API no longer has policies, remove the existing policy configuration - policyID := existing.ID + "-policies" - if err := s.policyManager.RemovePolicy(policyID); err != nil { - // Log at debug level since policy may not exist if API never had policies - log.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) - } else { - log.Info("Derived policy configuration removed (API no longer has policies)", - zap.String("policy_id", policyID)) - } - } - } + // Policy management is now handled by the deployment service } // DeleteAPI implements ServerInterface.DeleteAPI @@ -1175,7 +982,7 @@ func (s *APIServer) CreateLLMProvider(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(stored) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(stored) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -1265,7 +1072,7 @@ func (s *APIServer) UpdateLLMProvider(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) @@ -1412,7 +1219,7 @@ func (s *APIServer) CreateLLMProxy(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(stored) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(stored) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -1502,7 +1309,7 @@ func (s *APIServer) UpdateLLMProxy(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) @@ -1593,182 +1400,6 @@ func (s *APIServer) ListPolicies(c *gin.Context) { c.JSON(http.StatusOK, resp) } -// buildStoredPolicyFromAPI constructs a StoredPolicyConfig from an API config -// Merging rules: When operation has policies, they define the order (can reorder, override, or extend API policies). -// Remaining API-level policies not mentioned in operation policies are appended at the end. -// When operation has no policies, API-level policies are used in their declared order. -// RouteKey uses the fully qualified route path (context + operation path) and must match the route name format -// used by the xDS translator for consistency. -func (s *APIServer) buildStoredPolicyFromAPI(cfg *models.StoredConfig) *models.StoredPolicyConfig { - // TODO: (renuka) duplicate buildStoredPolicyFromAPI funcs. Refactor this. - apiCfg := &cfg.Configuration - - // Collect API-level policies - apiPolicies := make(map[string]policyenginev1.PolicyInstance) // name -> policy - if cfg.GetPolicies() != nil { - for _, p := range *cfg.GetPolicies() { - apiPolicies[p.Name] = convertAPIPolicy(p) - } - } - - routes := make([]policyenginev1.PolicyChain, 0) - switch apiCfg.Kind { - case api.Asyncwebsub: - // Build routes with merged policies - apiData, err := apiCfg.Spec.AsWebhookAPIData() - if err != nil { - // Handle error appropriately (e.g., log or return) - return nil - } - for _, ch := range apiData.Channels { - var finalPolicies []policyenginev1.PolicyInstance - - if ch.Policies != nil && len(*ch.Policies) > 0 { - // Operation has policies: use operation policy order as authoritative - // This allows operations to reorder, override, or extend API-level policies - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*ch.Policies)) - addedNames := make(map[string]struct{}) - - for _, opPolicy := range *ch.Policies { - finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) - addedNames[opPolicy.Name] = struct{}{} - } - - // Add any API-level policies not mentioned in operation policies (append at end) - if apiData.Policies != nil { - for _, apiPolicy := range *apiData.Policies { - if _, exists := addedNames[apiPolicy.Name]; !exists { - finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) - } - } - } - } else { - // No operation policies: use API-level policies in their declared order - if apiData.Policies != nil { - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) - for _, p := range *apiData.Policies { - finalPolicies = append(finalPolicies, apiPolicies[p.Name]) - } - } - } - - routeKey := xds.GenerateRouteName("POST", apiData.Context, apiData.Version, ch.Path, s.routerConfig.GatewayHost) - routes = append(routes, policyenginev1.PolicyChain{ - RouteKey: routeKey, - Policies: finalPolicies, - }) - } - case api.RestApi: - // Build routes with merged policies - apiData, err := apiCfg.Spec.AsAPIConfigData() - if err != nil { - // Handle error appropriately (e.g., log or return) - return nil - } - for _, op := range apiData.Operations { - var finalPolicies []policyenginev1.PolicyInstance - - if op.Policies != nil && len(*op.Policies) > 0 { - // Operation has policies: use operation policy order as authoritative - // This allows operations to reorder, override, or extend API-level policies - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*op.Policies)) - addedNames := make(map[string]struct{}) - - for _, opPolicy := range *op.Policies { - finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) - addedNames[opPolicy.Name] = struct{}{} - } - - // Add any API-level policies not mentioned in operation policies (append at end) - if apiData.Policies != nil { - for _, apiPolicy := range *apiData.Policies { - if _, exists := addedNames[apiPolicy.Name]; !exists { - finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) - } - } - } - } else { - // No operation policies: use API-level policies in their declared order - if apiData.Policies != nil { - finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) - for _, p := range *apiData.Policies { - finalPolicies = append(finalPolicies, apiPolicies[p.Name]) - } - } - } - - // Determine effective vhosts (fallback to global router defaults when not provided) - effectiveMainVHost := s.routerConfig.VHosts.Main.Default - effectiveSandboxVHost := s.routerConfig.VHosts.Sandbox.Default - if apiData.Vhosts != nil { - if strings.TrimSpace(apiData.Vhosts.Main) != "" { - effectiveMainVHost = apiData.Vhosts.Main - } - if apiData.Vhosts.Sandbox != nil && strings.TrimSpace(*apiData.Vhosts.Sandbox) != "" { - effectiveSandboxVHost = *apiData.Vhosts.Sandbox - } - } - - vhosts := []string{effectiveMainVHost} - if apiData.Upstream.Sandbox != nil && apiData.Upstream.Sandbox.Url != nil && - strings.TrimSpace(*apiData.Upstream.Sandbox.Url) != "" { - vhosts = append(vhosts, effectiveSandboxVHost) - } - - for _, vhost := range vhosts { - routes = append(routes, policyenginev1.PolicyChain{ - RouteKey: xds.GenerateRouteName(string(op.Method), apiData.Context, apiData.Version, op.Path, vhost), - Policies: finalPolicies, - }) - } - } - } - - // If there are no policies at all, return nil (skip creation) - policyCount := 0 - for _, r := range routes { - policyCount += len(r.Policies) - } - if policyCount == 0 { - return nil - } - - now := time.Now().Unix() - stored := &models.StoredPolicyConfig{ - ID: cfg.ID + "-policies", - Configuration: policyenginev1.Configuration{ - Routes: routes, - Metadata: policyenginev1.Metadata{ - CreatedAt: now, - UpdatedAt: now, - ResourceVersion: 0, - APIName: cfg.GetDisplayName(), - Version: cfg.GetVersion(), - Context: cfg.GetContext(), - }, - }, - Version: 0, - } - return stored -} - -// convertAPIPolicy converts generated api.Policy to policyenginev1.PolicyInstance -func convertAPIPolicy(p api.Policy) policyenginev1.PolicyInstance { - paramsMap := make(map[string]interface{}) - if p.Params != nil { - for k, v := range *p.Params { - paramsMap[k] = v - } - } - return policyenginev1.PolicyInstance{ - Name: p.Name, - Version: p.Version, - Enabled: true, // Default to enabled - ExecutionCondition: p.ExecutionCondition, - Parameters: paramsMap, - } -} - // CreateMCPProxy implements ServerInterface.CreateMCPProxy // (POST /mcp-proxies) func (s *APIServer) CreateMCPProxy(c *gin.Context) { @@ -1830,7 +1461,7 @@ func (s *APIServer) CreateMCPProxy(c *gin.Context) { // Build and add policy config derived from API configuration if policies are present if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(cfg) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(cfg) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to add derived policy configuration", zap.Error(err)) @@ -2007,7 +1638,7 @@ func (s *APIServer) UpdateMCPProxy(c *gin.Context, id string) { // Rebuild and update derived policy configuration if s.policyManager != nil { - storedPolicy := s.buildStoredPolicyFromAPI(updated) + storedPolicy := s.deploymentService.BuildStoredPolicyFromAPI(updated) if storedPolicy != nil { if err := s.policyManager.AddPolicy(storedPolicy); err != nil { log.Error("Failed to update derived policy configuration", zap.Error(err)) diff --git a/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go b/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go index 1009978a6..b0b7ca723 100644 --- a/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go +++ b/gateway/gateway-controller/pkg/api/handlers/policy_ordering_test.go @@ -26,6 +26,8 @@ import ( api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" ) // newTestAPIServer creates a minimal APIServer instance for testing @@ -34,11 +36,15 @@ func newTestAPIServer() *APIServer { Main: config.VHostEntry{Default: "localhost"}, Sandbox: config.VHostEntry{Default: "sandbox-*"}, } + routerConfig := &config.RouterConfig{ + GatewayHost: "localhost", + VHosts: *vhosts, + } + configStore := storage.NewConfigStore() + deploymentService := utils.NewAPIDeploymentService(configStore, nil, nil, nil, nil, routerConfig) return &APIServer{ - routerConfig: &config.RouterConfig{ - GatewayHost: "localhost", - VHosts: *vhosts, - }, + routerConfig: routerConfig, + deploymentService: deploymentService, } } @@ -194,7 +200,7 @@ func TestPolicyOrderingDeterministic(t *testing.T) { // Call the function server := newTestAPIServer() - result := server.buildStoredPolicyFromAPI(cfg) // Verify result is not nil when policies exist + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) // Verify result is not nil when policies exist if len(tt.expectedOrder) > 0 { require.NotNil(t, result, tt.description) require.Len(t, result.Configuration.Routes, 1, "Should have one route") @@ -308,7 +314,7 @@ func TestMultipleOperationsIndependentPolicies(t *testing.T) { } server := newTestAPIServer() - result := server.buildStoredPolicyFromAPI(cfg) + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) require.NotNil(t, result) require.Len(t, result.Configuration.Routes, 5, "Should have 5 routes") @@ -435,7 +441,7 @@ func TestPolicyOrderingConsistency(t *testing.T) { var firstOrder []string server := newTestAPIServer() for i := 0; i < 100; i++ { - result := server.buildStoredPolicyFromAPI(cfg) + result := server.deploymentService.BuildStoredPolicyFromAPI(cfg) require.NotNil(t, result) require.Len(t, result.Configuration.Routes, 1) diff --git a/gateway/gateway-controller/pkg/controlplane/client.go b/gateway/gateway-controller/pkg/controlplane/client.go index ad82be485..4b7822084 100644 --- a/gateway/gateway-controller/pkg/controlplane/client.go +++ b/gateway/gateway-controller/pkg/controlplane/client.go @@ -32,6 +32,7 @@ import ( "github.com/gorilla/websocket" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/utils" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" @@ -113,6 +114,7 @@ func NewClient( store *storage.ConfigStore, db storage.Storage, snapshotManager *xds.SnapshotManager, + policyManager *policyxds.PolicyManager, validator config.Validator, routerConfig *config.RouterConfig, ) *Client { @@ -126,7 +128,7 @@ func NewClient( snapshotManager: snapshotManager, parser: config.NewParser(), validator: validator, - deploymentService: utils.NewAPIDeploymentService(store, db, snapshotManager, validator, routerConfig), + deploymentService: utils.NewAPIDeploymentService(store, db, snapshotManager, policyManager, validator, routerConfig), state: &ConnectionState{ Current: Disconnected, Conn: nil, diff --git a/gateway/gateway-controller/pkg/utils/api_deployment.go b/gateway/gateway-controller/pkg/utils/api_deployment.go index 46af53f27..5fd0743e3 100644 --- a/gateway/gateway-controller/pkg/utils/api_deployment.go +++ b/gateway/gateway-controller/pkg/utils/api_deployment.go @@ -30,8 +30,10 @@ import ( "github.com/google/uuid" api "github.com/wso2/api-platform/gateway/gateway-controller/pkg/api/generated" + policyenginev1 "github.com/wso2/api-platform/sdk/gateway/policyengine/v1" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/config" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/models" + "github.com/wso2/api-platform/gateway/gateway-controller/pkg/policyxds" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/storage" "github.com/wso2/api-platform/gateway/gateway-controller/pkg/xds" "go.uber.org/zap" @@ -52,11 +54,85 @@ type APIDeploymentResult struct { IsUpdate bool } +// APIUpdateParams contains parameters for API update operations +type APIUpdateParams struct { + Handle string // API handle from URL path + Data []byte // Raw configuration data (YAML/JSON) + ContentType string // Content type for parsing + CorrelationID string // Correlation ID for tracking + Logger *zap.Logger // Logger instance +} + +// Custom error types for HTTP status code mapping + +// NotFoundError indicates the requested resource was not found +type NotFoundError struct { + Handle string +} + +func (e *NotFoundError) Error() string { + return fmt.Sprintf("API configuration with handle '%s' not found", e.Handle) +} + +// HandleMismatchError indicates the handle in the URL doesn't match the YAML metadata +type HandleMismatchError struct { + PathHandle string + YamlHandle string +} + +func (e *HandleMismatchError) Error() string { + return fmt.Sprintf("Handle mismatch: path has '%s' but YAML metadata.name has '%s'", e.PathHandle, e.YamlHandle) +} + +// APIValidationError wraps validation errors for API configurations +type APIValidationError struct { + Errors []config.ValidationError +} + +func (e *APIValidationError) Error() string { + return fmt.Sprintf("configuration validation failed with %d errors", len(e.Errors)) +} + +// TopicOperationError indicates WebSub topic operations failed +type TopicOperationError struct { + Message string +} + +func (e *TopicOperationError) Error() string { + return e.Message +} + +// ConflictError indicates a resource conflict (e.g., handle already exists) +type ConflictError struct { + Message string +} + +func (e *ConflictError) Error() string { + return e.Message +} + +// ParseError indicates configuration parsing failed +type ParseError struct { + Message string +} + +func (e *ParseError) Error() string { + return e.Message +} + +// DatabaseUnavailableError indicates the database is not available +type DatabaseUnavailableError struct{} + +func (e *DatabaseUnavailableError) Error() string { + return "Database storage not available" +} + // APIDeploymentService provides utilities for API configuration deployment type APIDeploymentService struct { store *storage.ConfigStore db storage.Storage snapshotManager *xds.SnapshotManager + policyManager *policyxds.PolicyManager parser *config.Parser validator config.Validator routerConfig *config.RouterConfig @@ -68,6 +144,7 @@ func NewAPIDeploymentService( store *storage.ConfigStore, db storage.Storage, snapshotManager *xds.SnapshotManager, + policyManager *policyxds.PolicyManager, validator config.Validator, routerConfig *config.RouterConfig, ) *APIDeploymentService { @@ -75,6 +152,7 @@ func NewAPIDeploymentService( store: store, db: db, snapshotManager: snapshotManager, + policyManager: policyManager, parser: config.NewParser(), validator: validator, httpClient: &http.Client{Timeout: 10 * time.Second}, @@ -164,86 +242,10 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams DeployedVersion: 0, } + // Handle AsyncWebSub topic lifecycle management if apiConfig.Kind == api.Asyncwebsub { - topicsToRegister, topicsToUnregister := s.GetTopicsForUpdate(*storedCfg) - // TODO: Pre configure the dynamic forward proxy rules for event gw - // This was communication bridge will be created on the gw startup - // Can perform internal communication with websub hub without relying on the dynamic rules - // Execute topic operations with wait group and errors tracking - var wg2 sync.WaitGroup - var regErrs int32 - var deregErrs int32 - - if len(topicsToRegister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - params.Logger.Info("Starting topic registration", zap.Int("total_topics", len(list)), zap.String("api_id", apiID)) - var childWg sync.WaitGroup - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, params.Logger); err != nil { - params.Logger.Error("Failed to register topic with WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", apiID)) - atomic.AddInt32(®Errs, 1) - return - } else { - params.Logger.Info("Successfully registered topic with WebSubHub", - zap.String("topic", topic), - zap.String("api_id", apiID)) - } - }(topic) - } - childWg.Wait() - }(topicsToRegister) - } - - if len(topicsToUnregister) > 0 { - wg2.Add(1) - go func(list []string) { - defer wg2.Done() - var childWg sync.WaitGroup - params.Logger.Info("Starting topic deregistration", zap.Int("total_topics", len(list)), zap.String("api_id", apiID)) - for _, topic := range list { - childWg.Add(1) - go func(topic string) { - defer childWg.Done() - if err := s.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, params.Logger); err != nil { - params.Logger.Error("Failed to deregister topic from WebSubHub", - zap.Error(err), - zap.String("topic", topic), - zap.String("api_id", apiID)) - atomic.AddInt32(&deregErrs, 1) - return - } else { - params.Logger.Info("Successfully deregistered topic from WebSubHub", - zap.String("topic", topic), - zap.String("api_id", apiID)) - } - }(topic) - } - childWg.Wait() - }(topicsToUnregister) - } - - wg2.Wait() - params.Logger.Info("Topic lifecycle operations completed", - zap.String("api_id", apiID), - zap.Int("registered", len(topicsToRegister)), - zap.Int("deregistered", len(topicsToUnregister)), - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - - // Check if topic operations failed and return error - if regErrs > 0 || deregErrs > 0 { - params.Logger.Error("Topic lifecycle operations failed", - zap.Int("register_errors", int(regErrs)), - zap.Int("deregister_errors", int(deregErrs))) - return nil, fmt.Errorf("failed to complete topic operations: %d registration error(s), %d deregistration error(s)", regErrs, deregErrs) + if err := s.handleTopicLifecycle(storedCfg, params.Logger); err != nil { + return nil, err } } @@ -269,17 +271,10 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams } // Update xDS snapshot asynchronously - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + s.triggerXDSSnapshotUpdate(apiID, params.CorrelationID, params.Logger) - if err := s.snapshotManager.UpdateSnapshot(ctx, params.CorrelationID); err != nil { - params.Logger.Error("Failed to update xDS snapshot", - zap.Error(err), - zap.String("api_id", apiID), - zap.String("correlation_id", params.CorrelationID)) - } - }() + // Update derived policy configuration + s.updatePolicyConfiguration(storedCfg, params.Logger) return &APIDeploymentResult{ StoredConfig: storedCfg, @@ -287,6 +282,188 @@ func (s *APIDeploymentService) DeployAPIConfiguration(params APIDeploymentParams }, nil } +// UpdateAPIConfiguration handles the complete API configuration update process +func (s *APIDeploymentService) UpdateAPIConfiguration(params APIUpdateParams) (*APIDeploymentResult, error) { + handle := params.Handle + + // Parse configuration + var apiConfig api.APIConfiguration + err := s.parser.Parse(params.Data, params.ContentType, &apiConfig) + if err != nil { + params.Logger.Error("Failed to parse configuration", zap.Error(err)) + return nil, &ParseError{Message: "Failed to parse configuration"} + } + + // Validate that the handle in the YAML matches the path parameter + if apiConfig.Metadata.Name != "" { + if apiConfig.Metadata.Name != handle { + params.Logger.Warn("Handle mismatch between path and YAML metadata", + zap.String("path_handle", handle), + zap.String("yaml_handle", apiConfig.Metadata.Name)) + return nil, &HandleMismatchError{ + PathHandle: handle, + YamlHandle: apiConfig.Metadata.Name, + } + } + } + + // Validate configuration + validationErrors := s.validator.Validate(&apiConfig) + if len(validationErrors) > 0 { + params.Logger.Warn("Configuration validation failed", + zap.String("handle", handle), + zap.Int("num_errors", len(validationErrors))) + return nil, &APIValidationError{Errors: validationErrors} + } + + if s.db == nil { + return nil, &DatabaseUnavailableError{} + } + + // Check if config exists + existing, err := s.db.GetConfigByHandle(handle) + if err != nil { + params.Logger.Warn("API configuration not found", zap.String("handle", handle)) + return nil, &NotFoundError{Handle: handle} + } + + // Update stored configuration + now := time.Now() + existing.Configuration = apiConfig + existing.Status = models.StatusPending + existing.UpdatedAt = now + existing.DeployedAt = nil + existing.DeployedVersion = 0 + + // Handle AsyncWebSub topic lifecycle management + if apiConfig.Kind == api.Asyncwebsub { + if err := s.handleTopicLifecycle(existing, params.Logger); err != nil { + return nil, err + } + } + + // Update database first (only if persistent mode) + if s.db != nil { + if err := s.db.UpdateConfig(existing); err != nil { + params.Logger.Error("Failed to update config in database", zap.Error(err)) + return nil, fmt.Errorf("failed to persist configuration update: %w", err) + } + } + + // Update in-memory store + if err := s.store.Update(existing); err != nil { + if storage.IsConflictError(err) { + params.Logger.Info("API configuration handle already exists", + zap.String("id", existing.ID), + zap.String("handle", handle)) + return nil, &ConflictError{Message: err.Error()} + } + params.Logger.Error("Failed to update config in memory store", zap.Error(err)) + return nil, fmt.Errorf("failed to update configuration in memory store: %w", err) + } + + params.Logger.Info("API configuration updated", + zap.String("id", existing.ID), + zap.String("handle", handle)) + + // Update xDS snapshot asynchronously + s.triggerXDSSnapshotUpdate(existing.ID, params.CorrelationID, params.Logger) + + // Update derived policy configuration + s.updatePolicyConfiguration(existing, params.Logger) + + return &APIDeploymentResult{ + StoredConfig: existing, + IsUpdate: true, + }, nil +} + +// handleTopicLifecycle manages WebSub topic registration/deregistration for AsyncWebSub APIs +func (s *APIDeploymentService) handleTopicLifecycle(storedCfg *models.StoredConfig, logger *zap.Logger) error { + topicsToRegister, topicsToUnregister := s.GetTopicsForUpdate(*storedCfg) + + var wg sync.WaitGroup + var regErrs int32 + var deregErrs int32 + + // Register new topics + if len(topicsToRegister) > 0 { + wg.Add(1) + go func(list []string) { + defer wg.Done() + logger.Info("Starting topic registration", + zap.Int("total_topics", len(list)), + zap.String("api_id", storedCfg.ID)) + var childWg sync.WaitGroup + for _, topic := range list { + childWg.Add(1) + go func(topic string) { + defer childWg.Done() + if err := s.RegisterTopicWithHub(s.httpClient, topic, "localhost", 8083, logger); err != nil { + logger.Error("Failed to register topic with WebSubHub", + zap.Error(err), + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + atomic.AddInt32(®Errs, 1) + } else { + logger.Info("Successfully registered topic with WebSubHub", + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + } + }(topic) + } + childWg.Wait() + }(topicsToRegister) + } + + // Deregister removed topics + if len(topicsToUnregister) > 0 { + wg.Add(1) + go func(list []string) { + defer wg.Done() + logger.Info("Starting topic deregistration", + zap.Int("total_topics", len(list)), + zap.String("api_id", storedCfg.ID)) + var childWg sync.WaitGroup + for _, topic := range list { + childWg.Add(1) + go func(topic string) { + defer childWg.Done() + if err := s.UnregisterTopicWithHub(s.httpClient, topic, "localhost", 8083, logger); err != nil { + logger.Error("Failed to deregister topic from WebSubHub", + zap.Error(err), + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + atomic.AddInt32(&deregErrs, 1) + } else { + logger.Info("Successfully deregistered topic from WebSubHub", + zap.String("topic", topic), + zap.String("api_id", storedCfg.ID)) + } + }(topic) + } + childWg.Wait() + }(topicsToUnregister) + } + + wg.Wait() + + logger.Info("Topic lifecycle operations completed", + zap.String("api_id", storedCfg.ID), + zap.Int("registered", len(topicsToRegister)), + zap.Int("deregistered", len(topicsToUnregister)), + zap.Int("register_errors", int(regErrs)), + zap.Int("deregister_errors", int(deregErrs))) + + if regErrs > 0 || deregErrs > 0 { + return &TopicOperationError{ + Message: fmt.Sprintf("Topic lifecycle operations failed: %d registration error(s), %d deregistration error(s)", regErrs, deregErrs), + } + } + + return nil +} + func (s *APIDeploymentService) GetTopicsForUpdate(apiConfig models.StoredConfig) ([]string, []string) { topics := s.store.TopicManager.GetAllByConfig(apiConfig.ID) topicsToRegister := []string{} @@ -496,3 +673,215 @@ func (s *APIDeploymentService) sendTopicRequestToHub(httpClient *http.Client, to func generateUUID() string { return uuid.New().String() } + +// BuildStoredPolicyFromAPI builds a StoredPolicyConfig from an API configuration. +// This builds policy chains for each route based on API-level and operation-level policies. +// RouteKey uses the fully qualified route path (context + operation path) and must match +// the route name format used by the xDS translator for consistency. +func (s *APIDeploymentService) BuildStoredPolicyFromAPI(cfg *models.StoredConfig) *models.StoredPolicyConfig { + apiCfg := &cfg.Configuration + + // Collect API-level policies + apiPolicies := make(map[string]policyenginev1.PolicyInstance) // name -> policy + if cfg.GetPolicies() != nil { + for _, p := range *cfg.GetPolicies() { + apiPolicies[p.Name] = convertAPIPolicy(p) + } + } + + routes := make([]policyenginev1.PolicyChain, 0) + switch apiCfg.Kind { + case api.Asyncwebsub: + // Build routes with merged policies + apiData, err := apiCfg.Spec.AsWebhookAPIData() + if err != nil { + return nil + } + for _, ch := range apiData.Channels { + var finalPolicies []policyenginev1.PolicyInstance + + if ch.Policies != nil && len(*ch.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*ch.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *ch.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + routeKey := xds.GenerateRouteName("POST", apiData.Context, apiData.Version, ch.Path, s.routerConfig.GatewayHost) + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: routeKey, + Policies: finalPolicies, + }) + } + case api.RestApi: + // Build routes with merged policies + apiData, err := apiCfg.Spec.AsAPIConfigData() + if err != nil { + return nil + } + for _, op := range apiData.Operations { + var finalPolicies []policyenginev1.PolicyInstance + + if op.Policies != nil && len(*op.Policies) > 0 { + // Operation has policies: use operation policy order as authoritative + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*op.Policies)) + addedNames := make(map[string]struct{}) + + for _, opPolicy := range *op.Policies { + finalPolicies = append(finalPolicies, convertAPIPolicy(opPolicy)) + addedNames[opPolicy.Name] = struct{}{} + } + + // Add any API-level policies not mentioned in operation policies (append at end) + if apiData.Policies != nil { + for _, apiPolicy := range *apiData.Policies { + if _, exists := addedNames[apiPolicy.Name]; !exists { + finalPolicies = append(finalPolicies, apiPolicies[apiPolicy.Name]) + } + } + } + } else { + // No operation policies: use API-level policies in their declared order + if apiData.Policies != nil { + finalPolicies = make([]policyenginev1.PolicyInstance, 0, len(*apiData.Policies)) + for _, p := range *apiData.Policies { + finalPolicies = append(finalPolicies, apiPolicies[p.Name]) + } + } + } + + // Determine effective vhosts (fallback to global router defaults when not provided) + effectiveMainVHost := s.routerConfig.VHosts.Main.Default + effectiveSandboxVHost := s.routerConfig.VHosts.Sandbox.Default + if apiData.Vhosts != nil { + if strings.TrimSpace(apiData.Vhosts.Main) != "" { + effectiveMainVHost = apiData.Vhosts.Main + } + if apiData.Vhosts.Sandbox != nil && strings.TrimSpace(*apiData.Vhosts.Sandbox) != "" { + effectiveSandboxVHost = *apiData.Vhosts.Sandbox + } + } + + vhosts := []string{effectiveMainVHost} + if apiData.Upstream.Sandbox != nil && apiData.Upstream.Sandbox.Url != nil && + strings.TrimSpace(*apiData.Upstream.Sandbox.Url) != "" { + vhosts = append(vhosts, effectiveSandboxVHost) + } + + for _, vhost := range vhosts { + routes = append(routes, policyenginev1.PolicyChain{ + RouteKey: xds.GenerateRouteName(string(op.Method), apiData.Context, apiData.Version, op.Path, vhost), + Policies: finalPolicies, + }) + } + } + } + + // If there are no policies at all, return nil (skip creation) + policyCount := 0 + for _, r := range routes { + policyCount += len(r.Policies) + } + if policyCount == 0 { + return nil + } + + now := time.Now().Unix() + stored := &models.StoredPolicyConfig{ + ID: cfg.ID + "-policies", + Configuration: policyenginev1.Configuration{ + Routes: routes, + Metadata: policyenginev1.Metadata{ + CreatedAt: now, + UpdatedAt: now, + ResourceVersion: 0, + APIName: cfg.GetDisplayName(), + Version: cfg.GetVersion(), + Context: cfg.GetContext(), + }, + }, + Version: 0, + } + return stored +} + +// convertAPIPolicy converts generated api.Policy to policyenginev1.PolicyInstance +func convertAPIPolicy(p api.Policy) policyenginev1.PolicyInstance { + paramsMap := make(map[string]interface{}) + if p.Params != nil { + for k, v := range *p.Params { + paramsMap[k] = v + } + } + return policyenginev1.PolicyInstance{ + Name: p.Name, + Version: p.Version, + Enabled: true, // Default to enabled + ExecutionCondition: p.ExecutionCondition, + Parameters: paramsMap, + } +} + +// updatePolicyConfiguration builds and updates/removes derived policy config for an API +func (s *APIDeploymentService) updatePolicyConfiguration(storedCfg *models.StoredConfig, logger *zap.Logger) { + if s.policyManager == nil { + return + } + + storedPolicy := s.BuildStoredPolicyFromAPI(storedCfg) + if storedPolicy != nil { + if err := s.policyManager.AddPolicy(storedPolicy); err != nil { + logger.Error("Failed to update derived policy configuration", zap.Error(err)) + } else { + logger.Info("Derived policy configuration updated", + zap.String("policy_id", storedPolicy.ID), + zap.Int("route_count", len(storedPolicy.Configuration.Routes))) + } + } else { + // API no longer has policies, remove the existing policy configuration + policyID := storedCfg.ID + "-policies" + if err := s.policyManager.RemovePolicy(policyID); err != nil { + // Log at debug level since policy may not exist if API never had policies + logger.Debug("No policy configuration to remove", zap.String("policy_id", policyID)) + } else { + logger.Info("Derived policy configuration removed (API no longer has policies)", + zap.String("policy_id", policyID)) + } + } +} + +// triggerXDSSnapshotUpdate asynchronously updates xDS snapshot +func (s *APIDeploymentService) triggerXDSSnapshotUpdate(apiID, correlationID string, logger *zap.Logger) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := s.snapshotManager.UpdateSnapshot(ctx, correlationID); err != nil { + logger.Error("Failed to update xDS snapshot", + zap.Error(err), + zap.String("api_id", apiID), + zap.String("correlation_id", correlationID)) + } + }() +} diff --git a/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go b/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go index 2297d9fa1..d96d4f2c3 100644 --- a/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go +++ b/gateway/gateway-controller/pkg/utils/websub_topic_registration_test.go @@ -18,7 +18,7 @@ func TestDeployAPIConfigurationWebSubKindTopicRegistration(t *testing.T) { db := &storage.SQLiteStorage{} snapshotManager := &xds.SnapshotManager{} validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, db, snapshotManager, validator, nil) + service := NewAPIDeploymentService(configStore, db, snapshotManager, nil, validator, nil) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub @@ -63,7 +63,7 @@ spec: func TestDeployAPIConfigurationWebSubKindRevisionDeployment(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub @@ -145,7 +145,7 @@ spec: func TestTopicRegistrationForConcurrentAPIConfigs(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil) // Two different API YAMLs yamlA := `kind: async/websub @@ -249,7 +249,7 @@ spec: func TestTopicDeregistrationOnConfigDeletion(t *testing.T) { configStore := storage.NewConfigStore() validator := config.NewAPIValidator() - service := NewAPIDeploymentService(configStore, nil, nil, validator, nil) + service := NewAPIDeploymentService(configStore, nil, nil, nil, validator, nil) // Inline YAML config similar to websubhub.yaml yamlConfig := `kind: async/websub