diff --git a/api/core/v1alpha2/tunnel_agent_types.go b/api/core/v1alpha2/tunnel_agent_types.go index 42b037b2..f03c7e1f 100644 --- a/api/core/v1alpha2/tunnel_agent_types.go +++ b/api/core/v1alpha2/tunnel_agent_types.go @@ -59,18 +59,18 @@ type TunnelAgentConnection struct { // +optional VNI *uint `json:"vni,omitempty,omitzero"` - // LastRXTimestamp is the last time a packet was received from the agent on + // LastRX is the last time a packet was received from the agent on // this connection. // +optional - LastRXTimestamp *metav1.Time `json:"lastRxTimestamp,omitempty,omitzero"` + LastRX *metav1.Time `json:"lastRx,omitempty,omitzero"` // RXBytes is the total number of bytes received from the agent on this connection. // +optional - RXBytes uint64 `json:"rxBytes,omitempty,omitzero"` + RXBytes *int64 `json:"rxBytes,omitempty,omitzero"` // TXBytes is the total number of bytes transmitted to the agent on this connection. // +optional - TxBytes uint64 `json:"txBytes,omitempty,omitzero"` + TXBytes *int64 `json:"txBytes,omitempty,omitzero"` } // TunnelAgentStatus represents the status of a tunnel agent. diff --git a/api/core/v1alpha2/zz_generated.deepcopy.go b/api/core/v1alpha2/zz_generated.deepcopy.go index 04dce1b9..bf9233cb 100644 --- a/api/core/v1alpha2/zz_generated.deepcopy.go +++ b/api/core/v1alpha2/zz_generated.deepcopy.go @@ -1414,10 +1414,20 @@ func (in *TunnelAgentConnection) DeepCopyInto(out *TunnelAgentConnection) { *out = new(uint) **out = **in } - if in.LastRXTimestamp != nil { - in, out := &in.LastRXTimestamp, &out.LastRXTimestamp + if in.LastRX != nil { + in, out := &in.LastRX, &out.LastRX *out = (*in).DeepCopy() } + if in.RXBytes != nil { + in, out := &in.RXBytes, &out.RXBytes + *out = new(int64) + **out = **in + } + if in.TXBytes != nil { + in, out := &in.TXBytes, &out.TXBytes + *out = new(int64) + **out = **in + } return } diff --git a/api/generated/zz_generated.openapi.go b/api/generated/zz_generated.openapi.go index 48f178f9..00a96fd7 100644 --- a/api/generated/zz_generated.openapi.go +++ b/api/generated/zz_generated.openapi.go @@ -5358,9 +5358,9 @@ func schema_apoxy_api_core_v1alpha2_TunnelAgentConnection(ref common.ReferenceCa Format: "int32", }, }, - "lastRxTimestamp": { + "lastRx": { SchemaProps: spec.SchemaProps{ - Description: "LastRXTimestamp is the last time a packet was received from the agent on this connection.", + Description: "LastRX is the last time a packet was received from the agent on this connection.", Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, diff --git a/go.mod b/go.mod index 41ee6fda..4fb736b5 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/adrg/xdg v0.5.3 github.com/alphadose/haxmap v1.4.1 github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f - github.com/apoxy-dev/icx v0.14.0 + github.com/apoxy-dev/icx v0.16.1 github.com/avast/retry-go/v4 v4.6.1 github.com/bramvdbogaerde/go-scp v1.5.0 github.com/buraksezer/olric v0.5.6 diff --git a/go.sum b/go.sum index adbe702c..9415e726 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,10 @@ github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57 h1:p2e github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57/go.mod h1:k8K1q/QnsxMM7/wsiga/cJWGW/38G907ex7JPFw0B04= github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45 h1:SwPk1n/oSVX7YwlNpC9KNH9YaYkcL/k6OfqSGVnxyyI= github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45/go.mod h1:z5rtgIizc+/K27UtB0occwZgqg/mz3IqgyUJW8aubbI= -github.com/apoxy-dev/icx v0.14.0 h1:3BXuhRysBsK2isLu7Z3+1pMiySu2eI0Ts5iObw6fp60= -github.com/apoxy-dev/icx v0.14.0/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U= +github.com/apoxy-dev/icx v0.16.0 h1:+mijAkIuTZVbXF6vMtSWUfn/TINHUaCzB+Da2HnPfpQ= +github.com/apoxy-dev/icx v0.16.0/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U= +github.com/apoxy-dev/icx v0.16.1 h1:u7Db5R9Fm7LoQ9cY6lq25T+l086v/zlERpbGYYVajsM= +github.com/apoxy-dev/icx v0.16.1/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U= github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e h1:10GIpiVyKoRgCyr0J2TvJtdn17bsFHN+ROWkeVJpcOU= github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ= github.com/apoxy-dev/upgrade-cli v0.0.0-20240213232412-a56c3a52fa0e h1:FBNxMQD93z2ththupB/BYKLEaMWaEr+G+sJWJqU2wC4= diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler.go b/pkg/apiserver/controllers/tunnel_agent_reconciler.go index 31235da9..5b083896 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler.go @@ -390,8 +390,8 @@ func (r *TunnelAgentReconciler) PruneOrphanedConnections(ctx context.Context) er // Determine orphaned-ness isOrphaned := false switch { - case conn.LastRXTimestamp != nil: - isOrphaned = conn.LastRXTimestamp.Add(gcMaxSilence).Before(now) + case conn.LastRX != nil: + isOrphaned = conn.LastRX.Add(gcMaxSilence).Before(now) case conn.ConnectedAt != nil: isOrphaned = conn.ConnectedAt.Add(gcMaxSilence).Before(now) default: diff --git a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go index e5079867..0d18fa9f 100644 --- a/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/apiserver/controllers/tunnel_agent_reconciler_test.go @@ -192,16 +192,16 @@ func TestTunnelAgentPruneOrphanedConnections(t *testing.T) { Status: corev1alpha2.TunnelAgentStatus{ Connections: []corev1alpha2.TunnelAgentConnection{ { - ID: "conn-orphaned", - Address: pfxOrphaned.String(), - VNI: &vOrphaned, - LastRXTimestamp: &orphaned, + ID: "conn-orphaned", + Address: pfxOrphaned.String(), + VNI: &vOrphaned, + LastRX: &orphaned, }, { - ID: "conn-fresh", - Address: pfxFresh.String(), - VNI: &vFresh, - LastRXTimestamp: &fresh, + ID: "conn-fresh", + Address: pfxFresh.String(), + VNI: &vFresh, + LastRX: &fresh, }, }, }, diff --git a/pkg/cmd/alpha/tunnel_run.go b/pkg/cmd/alpha/tunnel_run.go index 97d1f6da..f23a3e95 100644 --- a/pkg/cmd/alpha/tunnel_run.go +++ b/pkg/cmd/alpha/tunnel_run.go @@ -423,6 +423,7 @@ func connectAndInitSession( return cleanupOnErr(fmt.Errorf("parse assigned addresses: %w", err)) } + var allowedRoutes []icx.Route for _, route := range connectResp.Routes { dst, err := netip.ParsePrefix(route.Destination) if err != nil { @@ -431,13 +432,19 @@ func connectAndInitSession( slog.Any("error", err)) continue } - overlayAddrs = append(overlayAddrs, dst) + + for _, addr := range overlayAddrs { + allowedRoutes = append(allowedRoutes, icx.Route{ + Src: addr, + Dst: dst, + }) + } } if err := handler.AddVirtualNetwork( connectResp.VNI, netstack.ToFullAddress(remoteAddr), - overlayAddrs, + allowedRoutes, ); err != nil { return cleanupOnErr(fmt.Errorf("add virtual network: %w", err)) } diff --git a/pkg/tunnel/connection.go b/pkg/tunnel/connection.go index c48b8a03..036af766 100644 --- a/pkg/tunnel/connection.go +++ b/pkg/tunnel/connection.go @@ -85,10 +85,16 @@ func (c *connection) SetVNI(ctx context.Context, vni uint) error { c.vni = nil } - // Add new VNI - var addrs []netip.Prefix + var allowedRoutes []icx.Route if c.overlayAddr != nil { - addrs = []netip.Prefix{*c.overlayAddr} + // Relays shouldn't have conflicting destinations, so we can use a wildcard src. + var src netip.Prefix + if c.overlayAddr.Addr().Is4() { + src = netip.MustParsePrefix("0.0.0.0/0") + } else { + src = netip.MustParsePrefix("::/0") + } + allowedRoutes = []icx.Route{{Src: src, Dst: *c.overlayAddr}} } fa := netstack.ToFullAddress(c.remoteAddr) @@ -106,7 +112,7 @@ func (c *connection) SetVNI(ctx context.Context, vni uint) error { } } - if err := c.handler.AddVirtualNetwork(vni, fa, addrs); err != nil { + if err := c.handler.AddVirtualNetwork(vni, fa, allowedRoutes); err != nil { return fmt.Errorf("failed to add virtual network %d: %w", vni, err) } c.vni = &vni @@ -189,9 +195,18 @@ func (c *connection) SetOverlayAddress(addr string) error { // Update in-memory state. c.overlayAddr = &p - // 2) If a VNI is active, update its allowed prefixes in-place. + // Relays shouldn't have conflicting destinations, so we can use a wildcard src. + var src netip.Prefix + if c.overlayAddr.Addr().Is4() { + src = netip.MustParsePrefix("0.0.0.0/0") + } else { + src = netip.MustParsePrefix("::/0") + } + allowedRoutes := []icx.Route{{Src: src, Dst: p}} + + // 2) If a VNI is active, update its allowed routes in-place. if c.vni != nil { - if err := c.handler.UpdateVirtualNetworkAddrs(*c.vni, []netip.Prefix{p}); err != nil { + if err := c.handler.UpdateVirtualNetworkRoutes(*c.vni, allowedRoutes); err != nil { // Attempt to roll back router state to old addr on failure. if c.router != nil { _ = c.router.DelAddr(p) @@ -241,8 +256,8 @@ func (c *connection) Stats() (controllers.ConnectionStats, bool) { } return controllers.ConnectionStats{ - RXBytes: vnet.Stats.RXBytes.Load(), - TXBytes: vnet.Stats.TXBytes.Load(), + RXBytes: int64(vnet.Stats.RXBytes.Load()), + TXBytes: int64(vnet.Stats.TXBytes.Load()), LastRX: lastRx, }, true } diff --git a/pkg/tunnel/controllers/connection.go b/pkg/tunnel/controllers/connection.go index 9674dd66..fdcf3635 100644 --- a/pkg/tunnel/controllers/connection.go +++ b/pkg/tunnel/controllers/connection.go @@ -9,9 +9,9 @@ import ( // ConnectionStats is a lightweight snapshot of connection counters. type ConnectionStats struct { // RXBytes is the total number of bytes received on this connection. - RXBytes uint64 + RXBytes int64 // TXBytes is the total number of bytes transmitted on this connection. - TXBytes uint64 + TXBytes int64 // LastRX is the last time a packet was received on this connection. // The zero value indicates that no packets have been received. LastRX time.Time diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler.go b/pkg/tunnel/controllers/tunnel_agent_reconciler.go index bf2692fe..e8b2c711 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler.go @@ -319,13 +319,18 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) { if s, ok := conn.Stats(); ok { u := corev1alpha2.TunnelAgentConnection{ - ID: id, - RXBytes: s.RXBytes, - TxBytes: s.TXBytes, + ID: id, } if !s.LastRX.IsZero() { t := metav1.NewTime(s.LastRX) - u.LastRXTimestamp = &t + u.LastRX = &t + } + // Intentionally swap the order so transfer stats are from the agent's perspective. + if s.TXBytes != 0 { + u.RXBytes = &s.TXBytes + } + if s.RXBytes != 0 { + u.TXBytes = &s.RXBytes } updatesByAgent[agentName] = append(updatesByAgent[agentName], u) } @@ -334,7 +339,7 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) { // Apply updates per agent with conflict retries. for agentName, updates := range updatesByAgent { - _ = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { var cur corev1alpha2.TunnelAgent if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil { // If the object is gone, skip. @@ -352,14 +357,17 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) { for _, u := range updates { if c := connByID[u.ID]; c != nil { c.RXBytes = u.RXBytes - c.TxBytes = u.TxBytes - if u.LastRXTimestamp != nil { - c.LastRXTimestamp = u.LastRXTimestamp + c.TXBytes = u.TXBytes + if u.LastRX != nil { + c.LastRX = u.LastRX } } } return r.client.Status().Update(ctx, &cur) }) + if err != nil { + slog.Warn("Failed to update TunnelAgent stats", slog.String("agent", agentName), slog.Any("error", err)) + } } } diff --git a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go index adbbd40d..94475a79 100644 --- a/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go +++ b/pkg/tunnel/controllers/tunnel_agent_reconciler_test.go @@ -273,10 +273,10 @@ func TestTunnelAgentPushStatsOnce_UpdatesStatusForKnownConnection(t *testing.T) require.Len(t, got.Status.Connections, 1) entry := got.Status.Connections[0] assert.Equal(t, "conn-stat", entry.ID) - assert.Equal(t, uint64(1111), entry.RXBytes) - assert.Equal(t, uint64(2222), entry.TxBytes) - require.NotNil(t, entry.LastRXTimestamp) - assert.True(t, entry.LastRXTimestamp.Time.Equal(lastRX)) + assert.Equal(t, int64(2222), *entry.RXBytes) // From the agent's perspective + assert.Equal(t, int64(1111), *entry.TXBytes) + require.NotNil(t, entry.LastRX) + assert.True(t, entry.LastRX.Time.Equal(lastRX)) relay.AssertExpectations(t) conn.AssertExpectations(t) diff --git a/pkg/tunnel/router/icx_netlink_linux.go b/pkg/tunnel/router/icx_netlink_linux.go index e59f777b..849eb991 100644 --- a/pkg/tunnel/router/icx_netlink_linux.go +++ b/pkg/tunnel/router/icx_netlink_linux.go @@ -11,6 +11,7 @@ import ( "net/netip" "os" "sync" + "time" "github.com/apoxy-dev/icx" "github.com/apoxy-dev/icx/addrselect" @@ -108,6 +109,7 @@ func NewICXNetlinkRouter(opts ...Option) (*ICXNetlinkRouter, error) { handlerOpts := []icx.HandlerOption{ icx.WithVirtMAC(virtMAC), + icx.WithKeepAliveInterval(25 * time.Second), } for _, addr := range extAddrs { @@ -397,8 +399,8 @@ func (r *ICXNetlinkRouter) syncDNATChain() error { for i, peer := range peers { slog.Info("Adding DNAT rules for peer", slog.String("peer", peer.RemoteAddr.Addr.String())) - for _, addr := range peer.Addrs { - if addr.Addr().Is4() { // Skipping IPv4 peers - only IPv6 tunnel ingress is supported. + for _, route := range peer.AllowedRoutes { + if route.Dst.Addr().Is4() { // Skipping IPv4 peers - only IPv6 tunnel ingress is supported. continue } natRules.Write( @@ -407,7 +409,7 @@ func (r *ICXNetlinkRouter) syncDNATChain() error { "--mode", "random", "--probability", probability(len(peers)-i), "-j", "DNAT", - "--to-destination", addr.Addr().String(), + "--to-destination", route.Dst.Addr().String(), ) } } diff --git a/pkg/tunnel/router/icx_netstack.go b/pkg/tunnel/router/icx_netstack.go index 1a35ac29..4bf43aa6 100644 --- a/pkg/tunnel/router/icx_netstack.go +++ b/pkg/tunnel/router/icx_netstack.go @@ -9,6 +9,7 @@ import ( "net/netip" "strconv" "sync" + "time" "github.com/apoxy-dev/icx" "github.com/dpeckett/network" @@ -51,6 +52,7 @@ func NewICXNetstackRouter(opts ...Option) (*ICXNetstackRouter, error) { handlerOpts := []icx.HandlerOption{ icx.WithLayer3VirtFrames(), + icx.WithKeepAliveInterval(25 * time.Second), } if options.sourcePortHashing { handlerOpts = append(handlerOpts, icx.WithSourcePortHashing())