diff --git a/test/common.go b/test/common.go new file mode 100644 index 000000000000..b9caa0270832 --- /dev/null +++ b/test/common.go @@ -0,0 +1,63 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "context" + "fmt" + + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" +) + +func ReadyAddressCount(slices []discoveryv1.EndpointSlice) int { + count := 0 + for _, slice := range slices { + count += len(ReadyAddresses(slice)) + } + return count +} + +func EndpointSlicesForService(k kubernetes.Interface, namespace, name string) ([]discoveryv1.EndpointSlice, error) { + listOpts := metav1.ListOptions{ + LabelSelector: labels.Set{ + discoveryv1.LabelServiceName: name, + }.String(), + } + + slicesClient := k.DiscoveryV1().EndpointSlices(namespace) + slices, err := slicesClient.List(context.Background(), listOpts) + if err != nil { + return nil, fmt.Errorf(`error: getting endpoint slices for service "%s/%s": %w`, namespace, name, err) + } + + return slices.Items, nil +} + +// ReadyAddresses returns an iterator over ready endpoint addresses. +func ReadyAddresses(slice discoveryv1.EndpointSlice) []string { + var res []string + for _, ep := range slice.Endpoints { + if ep.Conditions.Ready != nil && !(*ep.Conditions.Ready) { + continue + } + res = append(res, ep.Addresses...) + } + return res +} diff --git a/test/e2e/autoscale_test.go b/test/e2e/autoscale_test.go index 619e726080fa..39dbf8e997b5 100644 --- a/test/e2e/autoscale_test.go +++ b/test/e2e/autoscale_test.go @@ -38,7 +38,6 @@ import ( "knative.dev/serving/pkg/networking" revnames "knative.dev/serving/pkg/reconciler/revision/resources/names" "knative.dev/serving/pkg/reconciler/serverlessservice/resources/names" - "knative.dev/serving/pkg/resources" rtesting "knative.dev/serving/pkg/testing/v1" "knative.dev/serving/test" testv1 "knative.dev/serving/test/v1" @@ -219,14 +218,12 @@ func TestTargetBurstCapacity(t *testing.T) { // We poll, since network programming takes times, but the timeout is set for // uniformness with one above. if err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 2*cfg.StableWindow, true, func(context.Context) (bool, error) { - svcEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get( - context.Background(), ctx.resources.Revision.Name, /* revision service name is equal to revision name*/ - metav1.GetOptions{}) + slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, ctx.resources.Revision.Name) if err != nil { return false, err } - t.Log("resources.ReadyAddressCount(svcEps) =", resources.ReadyAddressCount(svcEps)) - return resources.ReadyAddressCount(svcEps) == 2, nil + t.Log("resources.ReadyAddressCount(svcEps) =", test.ReadyAddressCount(slices)) + return test.ReadyAddressCount(slices) == 2, nil }); err != nil { t.Error("Never achieved subset of size 2:", err) } @@ -252,12 +249,12 @@ func TestTargetBurstCapacityMinusOne(t *testing.T) { if err != nil { t.Fatal("Error retrieving autoscaler configmap:", err) } - aeps, err := ctx.clients.KubeClient.CoreV1().Endpoints( - system.Namespace()).Get(context.Background(), networking.ActivatorServiceName, metav1.GetOptions{}) + + slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, system.Namespace(), networking.ActivatorServiceName) if err != nil { t.Fatal("Error getting activator endpoints:", err) } - t.Log("Activator endpoints:", aeps) + t.Log("Activator endpoints:", slices) // Wait for the activator endpoints to equalize. if err := waitForActivatorEndpoints(ctx); err != nil { @@ -336,11 +333,11 @@ func TestFastScaleToZero(t *testing.T) { // of 20 runs (11s) + 4s of buffer for reliability. st := time.Now() if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, cfg.ScaleToZeroGracePeriod+15*time.Second, true, func(context.Context) (bool, error) { - eps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get(context.Background(), epsN, metav1.GetOptions{}) + slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, epsN) if err != nil { return false, err } - return resources.ReadyAddressCount(eps) == 0, nil + return test.ReadyAddressCount(slices) == 0, nil }); err != nil { t.Fatalf("Did not observe %q to actually be emptied", epsN) } diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index dc1e4491d016..314b8767c60a 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -32,6 +32,7 @@ import ( // Mysteriously required to support GCP auth (required by k8s libs). // Apparently just importing it is enough. @_@ side effects @_@. // https://github.com/kubernetes/client-go/issues/242 + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" appsv1 "k8s.io/api/apps/v1" @@ -108,9 +109,7 @@ func waitForActivatorEndpoints(ctx *TestContext) error { ) if rerr := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, time.Minute, true, func(context.Context) (bool, error) { - // We need to fetch the activator endpoints at every check, since it can change. - actEps, err := ctx.clients.KubeClient.CoreV1().Endpoints( - system.Namespace()).Get(context.Background(), networking.ActivatorServiceName, metav1.GetOptions{}) + actEps, err := test.EndpointSlicesForService(ctx.clients.KubeClient, system.Namespace(), networking.ActivatorServiceName) if err != nil { return false, nil //nolint:nilerr } @@ -119,25 +118,22 @@ func waitForActivatorEndpoints(ctx *TestContext) error { return false, nil //nolint:nilerr } - svcEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get( - context.Background(), ctx.resources.Revision.Name, metav1.GetOptions{}) + svcEps, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, ctx.resources.Revision.Name) if err != nil { return false, err } wantAct = int(sks.Spec.NumActivators) aset = make(sets.Set[string], wantAct) - for _, ss := range actEps.Subsets { - for i := range len(ss.Addresses) { - aset.Insert(ss.Addresses[i].IP) - } + + for _, slice := range actEps { + aset.Insert(test.ReadyAddresses(slice)...) } svcSet = make(sets.Set[string], wantAct) - for _, ss := range svcEps.Subsets { - for i := range len(ss.Addresses) { - svcSet.Insert(ss.Addresses[i].IP) - } + for _, slice := range svcEps { + svcSet.Insert(test.ReadyAddresses(slice)...) } + // Subset wants this many activators, but there might not be as many, // so reduce the expectation. if aset.Len() < wantAct { diff --git a/test/e2e/minscale_readiness_test.go b/test/e2e/minscale_readiness_test.go index b652677ad39a..a95599e6c8aa 100644 --- a/test/e2e/minscale_readiness_test.go +++ b/test/e2e/minscale_readiness_test.go @@ -40,7 +40,6 @@ import ( "knative.dev/serving/pkg/apis/autoscaling" "knative.dev/serving/pkg/apis/serving" v1 "knative.dev/serving/pkg/apis/serving/v1" - "knative.dev/serving/pkg/resources" rtesting "knative.dev/serving/pkg/testing/v1" "knative.dev/serving/test" v1test "knative.dev/serving/test/v1" @@ -505,29 +504,25 @@ func latestRevisionName(t *testing.T, clients *test.Clients, configName, oldRevN // waitForDesiredScale returns the last observed number of pods and/or error if the cond // callback is never satisfied. func waitForDesiredScale(clients *test.Clients, serviceName string, cond func(int) bool) (latestReady int, err error) { - endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace) - // See https://github.com/knative/serving/issues/7727#issuecomment-706772507 for context. return latestReady, wait.PollUntilContextTimeout(context.Background(), time.Second, 3*time.Minute, true, func(context.Context) (bool, error) { - endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{}) + endpoints, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, serviceName) if err != nil { return false, nil //nolint:nilerr } - latestReady = resources.ReadyAddressCount(endpoint) + latestReady = test.ReadyAddressCount(endpoints) return cond(latestReady), nil }) } func ensureDesiredScale(clients *test.Clients, t *testing.T, serviceName string, cond func(int) bool) (latestReady int, observed bool) { - endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace) - err := wait.PollUntilContextTimeout(context.Background(), time.Second, 30*time.Second, true, func(context.Context) (bool, error) { - endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{}) + endpoints, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, serviceName) if err != nil { return false, nil //nolint:nilerr } - if latestReady = resources.ReadyAddressCount(endpoint); !cond(latestReady) { + if latestReady = test.ReadyAddressCount(endpoints); !cond(latestReady) { return false, fmt.Errorf("scale %d didn't meet condition", latestReady) } diff --git a/test/e2e/multicontainerprobing/multicontainer_readiness_test.go b/test/e2e/multicontainerprobing/multicontainer_readiness_test.go index 94ef552ce906..a86375a9b26c 100644 --- a/test/e2e/multicontainerprobing/multicontainer_readiness_test.go +++ b/test/e2e/multicontainerprobing/multicontainer_readiness_test.go @@ -26,14 +26,12 @@ import ( "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" pkgTest "knative.dev/pkg/test" "knative.dev/pkg/test/spoof" "knative.dev/serving/pkg/apis/autoscaling" v1 "knative.dev/serving/pkg/apis/serving/v1" - "knative.dev/serving/pkg/resources" rtesting "knative.dev/serving/pkg/testing/v1" "knative.dev/serving/test" "knative.dev/serving/test/e2e" @@ -386,15 +384,14 @@ func TestMultiContainerProbeStartFailingAfterReady(t *testing.T) { t.Fatal("Unable to get revision: ", err) } privateSvcName := e2e.PrivateServiceName(t, clients, revName) - endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace) var latestReady int if err := wait.PollUntilContextTimeout(context.Background(), time.Second, 60*time.Second, true, func(context.Context) (bool, error) { - endpoint, err := endpoints.Get(context.Background(), privateSvcName, metav1.GetOptions{}) + slices, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, privateSvcName) if err != nil { return false, nil //nolint:nilerr } - latestReady = resources.ReadyAddressCount(endpoint) + latestReady = test.ReadyAddressCount(slices) return latestReady == 0, nil }); err != nil { t.Fatalf("Service still has endpoints: %d", latestReady) diff --git a/test/ha/activator_test.go b/test/ha/activator_test.go index 800345bc00e8..df8b11862fb8 100644 --- a/test/ha/activator_test.go +++ b/test/ha/activator_test.go @@ -161,24 +161,27 @@ func gatherBackingActivators(ctx context.Context, client kubernetes.Interface, n pods := make(podIPs) for _, rev := range revs { - endpoints := client.CoreV1().Endpoints(namespace) - e, err := endpoints.Get(ctx, rev, metav1.GetOptions{}) + slices, err := test.EndpointSlicesForService(client, namespace, rev) if err != nil { return nil, fmt.Errorf("failed to gather %s endpoints: %w", rev, err) } - for _, subset := range e.Subsets { - for _, address := range subset.Addresses { - if address.TargetRef == nil { + for _, slice := range slices { + for _, ep := range slice.Endpoints { + if ep.TargetRef == nil { return nil, fmt.Errorf("%s service is not pointing to a pod", rev) } - name := address.TargetRef.Name + name := ep.TargetRef.Name if !strings.HasPrefix(name, activatorDeploymentName) { - return nil, fmt.Errorf("%s service is not pointing to an activator pod but: %s", rev, address.TargetRef.Name) + return nil, fmt.Errorf("%s service is not pointing to an activator pod but: %s", rev, ep.TargetRef.Name) } - pods[name] = address.IP + if len(ep.Addresses) == 0 { + return nil, fmt.Errorf("%s has no address: %s", rev, ep.TargetRef.Name) + } + + pods[name] = ep.Addresses[0] } } } diff --git a/test/ha/ha.go b/test/ha/ha.go index c8f279579a59..fd199e6d5f73 100644 --- a/test/ha/ha.go +++ b/test/ha/ha.go @@ -19,9 +19,11 @@ package ha import ( "context" "net/url" + "slices" "testing" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -80,29 +82,38 @@ func assertServiceEventuallyWorks(t *testing.T, clients *test.Clients, names tes } } -func waitForEndpointsState(client kubernetes.Interface, svcName, svcNamespace string, inState func(*corev1.Endpoints) (bool, error)) error { - endpointsService := client.CoreV1().Endpoints(svcNamespace) - - return wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) { - endpoint, err := endpointsService.Get(context.Background(), svcName, metav1.GetOptions{}) - if err != nil { - return false, err - } +func waitForEndpointsState( + client kubernetes.Interface, + svcName, + svcNamespace string, + inState func([]discoveryv1.EndpointSlice) (bool, error), +) error { + return wait.PollUntilContextTimeout( + context.Background(), + test.PollInterval, + test.PollTimeout, + true, + func(context.Context) (bool, error) { + slices, err := test.EndpointSlicesForService(client, svcNamespace, svcName) + if err != nil { + return false, err + } - return inState(endpoint) - }) + return inState(slices) + }) } -func readyEndpointsDoNotContain(ip string) func(*corev1.Endpoints) (bool, error) { - return func(eps *corev1.Endpoints) (bool, error) { - for _, subset := range eps.Subsets { - for _, ready := range subset.Addresses { - if ready.IP == ip { - return false, nil +func readyEndpointsDoNotContain(ip string) func([]discoveryv1.EndpointSlice) (bool, error) { + return func(epSlices []discoveryv1.EndpointSlice) (bool, error) { + contains := false + for _, slice := range epSlices { + for _, eps := range slice.Endpoints { + if eps.Conditions.Ready == nil || *eps.Conditions.Ready { + contains = contains || slices.Contains(eps.Addresses, ip) } } } - return true, nil + return !contains, nil } }