From 708c172f02e45b243360a25a1bb7a603cd8bfd6a Mon Sep 17 00:00:00 2001 From: Damian Peckett Date: Fri, 31 Oct 2025 12:06:54 +0100 Subject: [PATCH] [icxtunnel/controllers] wire up api server controllers and manually implement gc behavior as the default gc is not available --- _examples/tunnel.yaml | 19 ++++++ .../controllers/tunnel_agent_reconciler.go | 65 +++++++++++-------- .../controllers/tunnel_reconciler.go | 64 ++++++++++++++++-- pkg/apiserver/manager.go | 29 +++++++++ 4 files changed, 146 insertions(+), 31 deletions(-) create mode 100644 _examples/tunnel.yaml diff --git a/_examples/tunnel.yaml b/_examples/tunnel.yaml new file mode 100644 index 00000000..82e2886b --- /dev/null +++ b/_examples/tunnel.yaml @@ -0,0 +1,19 @@ +apiVersion: core.apoxy.dev/v1alpha2 +kind: Tunnel +metadata: + name: example +spec: + egressGateway: + enabled: true +--- +apiVersion: core.apoxy.dev/v1alpha2 +kind: TunnelAgent +metadata: + name: example +spec: + tunnelRef: + name: example +status: + # So we can test VNI and IPAM allocation. + connections: + - id: "example-1" \ No newline at end of file diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go index f2497a5a..ddf16383 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -45,6 +45,8 @@ func NewTunnelAgentReconciler(c client.Client, agentIPAM tunnet.IPAM, vniPool *v func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := controllerlog.FromContext(ctx, "name", req.Name) + log.Info("Reconciling TunnelAgent") + var agent corev1alpha2.TunnelAgent if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil { if apierrors.IsNotFound(err) { @@ -53,16 +55,19 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - // handle deletion + // Handle deletion if !agent.DeletionTimestamp.IsZero() { + log.Info("Handling deletion of TunnelAgent") + if controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { + log.Info("Releasing resources for TunnelAgent") + changed, err := r.releaseResourcesIfPresent(ctx, log, req.NamespacedName) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to release resources: %w", err) } - // releaseResourcesIfPresent potentially mutates the object, so we need - // to refetch it to avoid conflicts when we remove the finalizer. + // Refetch to avoid conflicts if we modified the object if changed { if err := r.client.Get(ctx, req.NamespacedName, &agent); err != nil { if apierrors.IsNotFound(err) { @@ -72,6 +77,8 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } + log.Info("Removing finalizer from TunnelAgent") + // Remove finalizer controllerutil.RemoveFinalizer(&agent, ApiServerFinalizer) if err := r.client.Update(ctx, &agent); err != nil { @@ -82,7 +89,7 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - // ensure finalizer + // Ensure finalizer if !controllerutil.ContainsFinalizer(&agent, ApiServerFinalizer) { controllerutil.AddFinalizer(&agent, ApiServerFinalizer) if err := r.client.Update(ctx, &agent); err != nil { @@ -90,7 +97,7 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } - // fetch owner Tunnel + // Fetch owner Tunnel tunnelName := agent.Spec.TunnelRef.Name if tunnelName == "" { // TODO: why would this happen? Should we mark the agent as failed. @@ -98,17 +105,18 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } + log.Info("Fetching owner Tunnel", "tunnelName", tunnelName) + var tunnel corev1alpha2.Tunnel if err := r.client.Get(ctx, client.ObjectKey{Name: tunnelName}, &tunnel); err != nil { if apierrors.IsNotFound(err) { - // TODO: why would this happen? Should we mark the agent as failed. log.Info("Referenced Tunnel not found; skipping", "tunnelName", tunnelName) return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } return ctrl.Result{}, err } - // ensure controller ownerRef agent -> tunnel + // Ensure controller ownerRef agent -> tunnel changed, err := r.ensureControllerOwner(&agent, &tunnel) if err != nil { return ctrl.Result{}, err @@ -128,6 +136,24 @@ func (r *TunnelAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) } func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { + // So that the Tunnel reconciler can list controller-owned TunnelAgents. + if err := mgr.GetFieldIndexer().IndexField( + ctx, + &corev1alpha2.TunnelAgent{}, + ".metadata.controllerOwnerUID", + func(obj client.Object) []string { + ta := obj.(*corev1alpha2.TunnelAgent) + for _, or := range ta.GetOwnerReferences() { + if or.Controller != nil && *or.Controller { + return []string{string(or.UID)} + } + } + return nil + }, + ); err != nil { + return err + } + // Reconcile when spec generation changes OR when status (e.g., Connections) changes. statusOrGenChanged := predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { return true }, @@ -140,7 +166,8 @@ func (r *TunnelAgentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.M } genChanged := oldObj.GetGeneration() != newObj.GetGeneration() statusDiff := !equality.Semantic.DeepEqual(oldObj.Status, newObj.Status) - return genChanged || statusDiff + deletionBegan := oldObj.GetDeletionTimestamp().IsZero() && !newObj.GetDeletionTimestamp().IsZero() + return genChanged || statusDiff || deletionBegan }, } @@ -182,11 +209,10 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( } conn.Address = pfx.String() newlyAllocatedPrefixes = append(newlyAllocatedPrefixes, pfx) - log.Info("Allocated overlay address", "connectionID", conn.ID, "address", conn.Address) } - // Allocate VNI if missing (nil means "unset"; zero can be valid but your pool won't return 0) + // Allocate VNI if missing if conn.VNI == nil { vni, err := r.vniPool.Allocate() if err != nil { @@ -201,13 +227,11 @@ func (r *TunnelAgentReconciler) ensureConnectionAllocations( } conn.VNI = &vni newlyAllocatedVNIs = append(newlyAllocatedVNIs, vni) - log.Info("Allocated VNI", "connectionID", conn.ID, "vni", *conn.VNI) } } if len(newlyAllocatedPrefixes) == 0 && len(newlyAllocatedVNIs) == 0 { - // nothing changed return nil } @@ -234,13 +258,11 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent( ) (bool, error) { var changed bool err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - // Always work on a fresh copy to avoid write conflicts. var cur corev1alpha2.TunnelAgent if err := r.client.Get(ctx, key, &cur); err != nil { if apierrors.IsNotFound(err) { return nil } - return err } @@ -249,7 +271,6 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent( return nil } - // Free resources that are still recorded in status and clear the fields. for i := range cur.Status.Connections { conn := &cur.Status.Connections[i] @@ -263,7 +284,7 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent( return fmt.Errorf("failed to release address %q: %w", conn.Address, err) } log.Info("Released overlay address", "connectionID", conn.ID, "address", conn.Address) - conn.Address = "" // clear in status + conn.Address = "" changed = true } @@ -272,18 +293,16 @@ func (r *TunnelAgentReconciler) releaseResourcesIfPresent( vni := *conn.VNI r.vniPool.Release(vni) log.Info("Released VNI", "connectionID", conn.ID, "vni", vni) - conn.VNI = nil // clear in status + conn.VNI = nil changed = true } } - // Commit to status. if changed { if err := r.client.Status().Update(ctx, &cur); err != nil { return err } } - return nil }) @@ -297,14 +316,8 @@ func (r *TunnelAgentReconciler) ensureControllerOwner(child client.Object, owner } } - // Set controller reference (overwrites any existing controller owner) - if err := controllerutil.SetControllerReference( - owner, - child, - r.client.Scheme(), - ); err != nil { + if err := controllerutil.SetControllerReference(owner, child, r.client.Scheme()); err != nil { return false, err } - return true, nil } diff --git a/pkg/apiserver/controllers/tunnel_reconciler.go b/pkg/apiserver/controllers/tunnel_reconciler.go index 6e16414c..113a456a 100644 --- a/pkg/apiserver/controllers/tunnel_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_reconciler.go @@ -5,12 +5,14 @@ import ( "crypto/rand" "encoding/base64" "io" + "time" apierrors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" controllerlog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -39,13 +41,49 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } - return ctrl.Result{}, err } - // handle deletion + // Handle deletion with manual "reaper" semantics: foreground-like delete of controller-owned children. if !tunnel.DeletionTimestamp.IsZero() { + log.Info("Handling deletion of Tunnel") + if controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { + // Manually implement garbage collection of controller-owned TunnelAgents. + // This is due to us not using the built in gc controller from k8s.io/controller-manager. + + // List controller-owned TunnelAgents by indexed controller owner UID. + var agents corev1alpha2.TunnelAgentList + if err := r.client.List( + ctx, + &agents, + client.MatchingFields{".metadata.controllerOwnerUID": string(tunnel.GetUID())}, + ); err != nil { + return ctrl.Result{}, err + } + + // Kick off deletion for any children that still exist. + stillPresent := false + for i := range agents.Items { + a := &agents.Items[i] + stillPresent = true + if a.DeletionTimestamp.IsZero() { + if err := r.client.Delete(ctx, a); err != nil && !apierrors.IsNotFound(err) { + return ctrl.Result{}, err + } + } + } + + // If any child remains (possibly terminating due to its own finalizers), + // requeue and keep the parent's finalizer to emulate foreground deletion. + if stillPresent { + log.Info("Waiting for controller-owned TunnelAgents to terminate", "remaining", len(agents.Items)) + return ctrl.Result{RequeueAfter: 2 * time.Second}, nil + } + + // No children remain → remove the parent's finalizer. + log.Info("All controller-owned TunnelAgents gone; removing Tunnel finalizer") + // Remove finalizer controllerutil.RemoveFinalizer(&tunnel, ApiServerFinalizer) if err := r.client.Update(ctx, &tunnel); err != nil { @@ -56,7 +94,7 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } - // ensure finalizer + // Ensure finalizer. if !controllerutil.ContainsFinalizer(&tunnel, ApiServerFinalizer) { controllerutil.AddFinalizer(&tunnel, ApiServerFinalizer) if err := r.client.Update(ctx, &tunnel); err != nil { @@ -64,7 +102,7 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } } - // ensure bearer token + // Ensure bearer token in status. if tunnel.Status.Credentials == nil || tunnel.Status.Credentials.Token == "" { log.Info("Generating new bearer token for Tunnel") @@ -87,8 +125,24 @@ func (r *TunnelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } func (r *TunnelReconciler) SetupWithManager(mgr ctrl.Manager) error { + // Reconcile when spec generation changes OR deletion is requested. + genChanged := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { return true }, + DeleteFunc: func(e event.DeleteEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj, ok1 := e.ObjectOld.(*corev1alpha2.Tunnel) + newObj, ok2 := e.ObjectNew.(*corev1alpha2.Tunnel) + if !ok1 || !ok2 { + return false + } + gc := oldObj.GetGeneration() != newObj.GetGeneration() + deletionBegan := oldObj.GetDeletionTimestamp().IsZero() && !newObj.GetDeletionTimestamp().IsZero() + return gc || deletionBegan + }, + } + return ctrl.NewControllerManagedBy(mgr). - For(&corev1alpha2.Tunnel{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + For(&corev1alpha2.Tunnel{}, builder.WithPredicates(genChanged)). Complete(r) } diff --git a/pkg/apiserver/manager.go b/pkg/apiserver/manager.go index 84586a8a..cc903235 100644 --- a/pkg/apiserver/manager.go +++ b/pkg/apiserver/manager.go @@ -49,6 +49,7 @@ import ( "github.com/apoxy-dev/apoxy/pkg/log" apoxynet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" tunnet "github.com/apoxy-dev/apoxy/pkg/tunnel/net" + "github.com/apoxy-dev/apoxy/pkg/tunnel/vni" ctrlv1alpha1 "github.com/apoxy-dev/apoxy/api/controllers/v1alpha1" corev1alpha "github.com/apoxy-dev/apoxy/api/core/v1alpha" @@ -186,6 +187,7 @@ type options struct { resources []resource.Object proxyIPAM tunnet.IPAM agentIPAM tunnet.IPAM + vniPool *vni.VNIPool } // WithJWTKeys sets the JWT key pair. @@ -317,6 +319,13 @@ func WithAgentIPAM(ipam tunnet.IPAM) Option { } } +// WithVNIPool sets the VNI pool for tunnel agents. +func WithVNIPool(pool *vni.VNIPool) Option { + return func(o *options) { + o.vniPool = pool + } +} + func defaultResources() []resource.Object { // Higher versions need to be registered first as storage resources. return []resource.Object{ @@ -383,6 +392,8 @@ func defaultOptions(ctx context.Context) (*options, error) { return nil, fmt.Errorf("failed to create proxy IPAM: %w", err) } + vniPool := vni.NewVNIPool() + opts := &options{ clientConfig: NewClientConfig(), @@ -406,6 +417,7 @@ func defaultOptions(ctx context.Context) (*options, error) { proxyIPAM: proxyIPAM, agentIPAM: agentIPAM, + vniPool: vniPool, } // Generate default JWT key pair if not provided @@ -491,6 +503,7 @@ func (m *Manager) Start( return fmt.Errorf("failed to set up Proxy controller: %v", err) } + // Legacy v1alpha1 TunnelNode controller log.Infof("Registering TunnelNode controller") tunnelNodeReconciler := controllers.NewTunnelNodeReconciler( m.manager.GetClient(), @@ -513,6 +526,22 @@ func (m *Manager) Start( return nil }) + log.Infof("Registering Tunnel controller") + tunnelReconciler := controllers.NewTunnelReconciler(m.manager.GetClient()) + if err := tunnelReconciler.SetupWithManager(m.manager); err != nil { + return fmt.Errorf("failed to set up Tunnel controller: %v", err) + } + + log.Infof("Registering TunnelAgent controller") + tunnelAgentReconciler := controllers.NewTunnelAgentReconciler( + m.manager.GetClient(), + dOpts.agentIPAM, + dOpts.vniPool, + ) + if err := tunnelAgentReconciler.SetupWithManager(ctx, m.manager); err != nil { + return fmt.Errorf("failed to set up TunnelAgent controller: %v", err) + } + log.Infof("Registering Gateway controller") gwOpts := []gateway.Option{} if dOpts.enableKubeAPI {