Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions test/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2025 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

import (
"context"
"fmt"

discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)

func ReadyAddressCount(slices []discoveryv1.EndpointSlice) int {
count := 0
for _, slice := range slices {
count += len(ReadyAddresses(slice))
}
return count
}

func EndpointSlicesForService(k kubernetes.Interface, namespace, name string) ([]discoveryv1.EndpointSlice, error) {
listOpts := metav1.ListOptions{
LabelSelector: labels.Set{
discoveryv1.LabelServiceName: name,
}.String(),
}

slicesClient := k.DiscoveryV1().EndpointSlices(namespace)
slices, err := slicesClient.List(context.Background(), listOpts)
if err != nil {
return nil, fmt.Errorf(`error: getting endpoint slices for service "%s/%s": %w`, namespace, name, err)
}

return slices.Items, nil
}

// ReadyAddresses returns an iterator over ready endpoint addresses.
func ReadyAddresses(slice discoveryv1.EndpointSlice) []string {
var res []string
for _, ep := range slice.Endpoints {
if ep.Conditions.Ready != nil && !(*ep.Conditions.Ready) {
continue
}
res = append(res, ep.Addresses...)
}
return res
}
19 changes: 8 additions & 11 deletions test/e2e/autoscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"knative.dev/serving/pkg/networking"
revnames "knative.dev/serving/pkg/reconciler/revision/resources/names"
"knative.dev/serving/pkg/reconciler/serverlessservice/resources/names"
"knative.dev/serving/pkg/resources"
rtesting "knative.dev/serving/pkg/testing/v1"
"knative.dev/serving/test"
testv1 "knative.dev/serving/test/v1"
Expand Down Expand Up @@ -219,14 +218,12 @@ func TestTargetBurstCapacity(t *testing.T) {
// We poll, since network programming takes times, but the timeout is set for
// uniformness with one above.
if err := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, 2*cfg.StableWindow, true, func(context.Context) (bool, error) {
svcEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get(
context.Background(), ctx.resources.Revision.Name, /* revision service name is equal to revision name*/
metav1.GetOptions{})
slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, ctx.resources.Revision.Name)
if err != nil {
return false, err
}
t.Log("resources.ReadyAddressCount(svcEps) =", resources.ReadyAddressCount(svcEps))
return resources.ReadyAddressCount(svcEps) == 2, nil
t.Log("resources.ReadyAddressCount(svcEps) =", test.ReadyAddressCount(slices))
return test.ReadyAddressCount(slices) == 2, nil
}); err != nil {
t.Error("Never achieved subset of size 2:", err)
}
Expand All @@ -252,12 +249,12 @@ func TestTargetBurstCapacityMinusOne(t *testing.T) {
if err != nil {
t.Fatal("Error retrieving autoscaler configmap:", err)
}
aeps, err := ctx.clients.KubeClient.CoreV1().Endpoints(
system.Namespace()).Get(context.Background(), networking.ActivatorServiceName, metav1.GetOptions{})

slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, system.Namespace(), networking.ActivatorServiceName)
if err != nil {
t.Fatal("Error getting activator endpoints:", err)
}
t.Log("Activator endpoints:", aeps)
t.Log("Activator endpoints:", slices)

// Wait for the activator endpoints to equalize.
if err := waitForActivatorEndpoints(ctx); err != nil {
Expand Down Expand Up @@ -336,11 +333,11 @@ func TestFastScaleToZero(t *testing.T) {
// of 20 runs (11s) + 4s of buffer for reliability.
st := time.Now()
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, cfg.ScaleToZeroGracePeriod+15*time.Second, true, func(context.Context) (bool, error) {
eps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get(context.Background(), epsN, metav1.GetOptions{})
slices, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, epsN)
if err != nil {
return false, err
}
return resources.ReadyAddressCount(eps) == 0, nil
return test.ReadyAddressCount(slices) == 0, nil
}); err != nil {
t.Fatalf("Did not observe %q to actually be emptied", epsN)
}
Expand Down
22 changes: 9 additions & 13 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
// Mysteriously required to support GCP auth (required by k8s libs).
// Apparently just importing it is enough. @_@ side effects @_@.
// https://github.com/kubernetes/client-go/issues/242

_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -108,9 +109,7 @@ func waitForActivatorEndpoints(ctx *TestContext) error {
)

if rerr := wait.PollUntilContextTimeout(context.Background(), 250*time.Millisecond, time.Minute, true, func(context.Context) (bool, error) {
// We need to fetch the activator endpoints at every check, since it can change.
actEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(
system.Namespace()).Get(context.Background(), networking.ActivatorServiceName, metav1.GetOptions{})
actEps, err := test.EndpointSlicesForService(ctx.clients.KubeClient, system.Namespace(), networking.ActivatorServiceName)
if err != nil {
return false, nil //nolint:nilerr
}
Expand All @@ -119,25 +118,22 @@ func waitForActivatorEndpoints(ctx *TestContext) error {
return false, nil //nolint:nilerr
}

svcEps, err := ctx.clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace).Get(
context.Background(), ctx.resources.Revision.Name, metav1.GetOptions{})
svcEps, err := test.EndpointSlicesForService(ctx.clients.KubeClient, test.ServingFlags.TestNamespace, ctx.resources.Revision.Name)
if err != nil {
return false, err
}

wantAct = int(sks.Spec.NumActivators)
aset = make(sets.Set[string], wantAct)
for _, ss := range actEps.Subsets {
for i := range len(ss.Addresses) {
aset.Insert(ss.Addresses[i].IP)
}

for _, slice := range actEps {
aset.Insert(test.ReadyAddresses(slice)...)
}
svcSet = make(sets.Set[string], wantAct)
for _, ss := range svcEps.Subsets {
for i := range len(ss.Addresses) {
svcSet.Insert(ss.Addresses[i].IP)
}
for _, slice := range svcEps {
svcSet.Insert(test.ReadyAddresses(slice)...)
}

// Subset wants this many activators, but there might not be as many,
// so reduce the expectation.
if aset.Len() < wantAct {
Expand Down
13 changes: 4 additions & 9 deletions test/e2e/minscale_readiness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"knative.dev/serving/pkg/apis/autoscaling"
"knative.dev/serving/pkg/apis/serving"
v1 "knative.dev/serving/pkg/apis/serving/v1"
"knative.dev/serving/pkg/resources"
rtesting "knative.dev/serving/pkg/testing/v1"
"knative.dev/serving/test"
v1test "knative.dev/serving/test/v1"
Expand Down Expand Up @@ -505,29 +504,25 @@ func latestRevisionName(t *testing.T, clients *test.Clients, configName, oldRevN
// waitForDesiredScale returns the last observed number of pods and/or error if the cond
// callback is never satisfied.
func waitForDesiredScale(clients *test.Clients, serviceName string, cond func(int) bool) (latestReady int, err error) {
endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace)

// See https://github.com/knative/serving/issues/7727#issuecomment-706772507 for context.
return latestReady, wait.PollUntilContextTimeout(context.Background(), time.Second, 3*time.Minute, true, func(context.Context) (bool, error) {
endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{})
endpoints, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, serviceName)
if err != nil {
return false, nil //nolint:nilerr
}
latestReady = resources.ReadyAddressCount(endpoint)
latestReady = test.ReadyAddressCount(endpoints)
return cond(latestReady), nil
})
}

func ensureDesiredScale(clients *test.Clients, t *testing.T, serviceName string, cond func(int) bool) (latestReady int, observed bool) {
endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace)

err := wait.PollUntilContextTimeout(context.Background(), time.Second, 30*time.Second, true, func(context.Context) (bool, error) {
endpoint, err := endpoints.Get(context.Background(), serviceName, metav1.GetOptions{})
endpoints, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, serviceName)
if err != nil {
return false, nil //nolint:nilerr
}

if latestReady = resources.ReadyAddressCount(endpoint); !cond(latestReady) {
if latestReady = test.ReadyAddressCount(endpoints); !cond(latestReady) {
return false, fmt.Errorf("scale %d didn't meet condition", latestReady)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
pkgTest "knative.dev/pkg/test"
"knative.dev/pkg/test/spoof"
"knative.dev/serving/pkg/apis/autoscaling"
v1 "knative.dev/serving/pkg/apis/serving/v1"
"knative.dev/serving/pkg/resources"
rtesting "knative.dev/serving/pkg/testing/v1"
"knative.dev/serving/test"
"knative.dev/serving/test/e2e"
Expand Down Expand Up @@ -386,15 +384,14 @@ func TestMultiContainerProbeStartFailingAfterReady(t *testing.T) {
t.Fatal("Unable to get revision: ", err)
}
privateSvcName := e2e.PrivateServiceName(t, clients, revName)
endpoints := clients.KubeClient.CoreV1().Endpoints(test.ServingFlags.TestNamespace)

var latestReady int
if err := wait.PollUntilContextTimeout(context.Background(), time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
endpoint, err := endpoints.Get(context.Background(), privateSvcName, metav1.GetOptions{})
slices, err := test.EndpointSlicesForService(clients.KubeClient, test.ServingFlags.TestNamespace, privateSvcName)
if err != nil {
return false, nil //nolint:nilerr
}
latestReady = resources.ReadyAddressCount(endpoint)
latestReady = test.ReadyAddressCount(slices)
return latestReady == 0, nil
}); err != nil {
t.Fatalf("Service still has endpoints: %d", latestReady)
Expand Down
19 changes: 11 additions & 8 deletions test/ha/activator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,27 @@ func gatherBackingActivators(ctx context.Context, client kubernetes.Interface, n
pods := make(podIPs)

for _, rev := range revs {
endpoints := client.CoreV1().Endpoints(namespace)
e, err := endpoints.Get(ctx, rev, metav1.GetOptions{})
slices, err := test.EndpointSlicesForService(client, namespace, rev)
if err != nil {
return nil, fmt.Errorf("failed to gather %s endpoints: %w", rev, err)
}

for _, subset := range e.Subsets {
for _, address := range subset.Addresses {
if address.TargetRef == nil {
for _, slice := range slices {
for _, ep := range slice.Endpoints {
if ep.TargetRef == nil {
return nil, fmt.Errorf("%s service is not pointing to a pod", rev)
}

name := address.TargetRef.Name
name := ep.TargetRef.Name
if !strings.HasPrefix(name, activatorDeploymentName) {
return nil, fmt.Errorf("%s service is not pointing to an activator pod but: %s", rev, address.TargetRef.Name)
return nil, fmt.Errorf("%s service is not pointing to an activator pod but: %s", rev, ep.TargetRef.Name)
}

pods[name] = address.IP
if len(ep.Addresses) == 0 {
return nil, fmt.Errorf("%s has no address: %s", rev, ep.TargetRef.Name)
}

pods[name] = ep.Addresses[0]
}
}
}
Expand Down
45 changes: 28 additions & 17 deletions test/ha/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package ha
import (
"context"
"net/url"
"slices"
"testing"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -80,29 +82,38 @@ func assertServiceEventuallyWorks(t *testing.T, clients *test.Clients, names tes
}
}

func waitForEndpointsState(client kubernetes.Interface, svcName, svcNamespace string, inState func(*corev1.Endpoints) (bool, error)) error {
endpointsService := client.CoreV1().Endpoints(svcNamespace)

return wait.PollUntilContextTimeout(context.Background(), test.PollInterval, test.PollTimeout, true, func(context.Context) (bool, error) {
endpoint, err := endpointsService.Get(context.Background(), svcName, metav1.GetOptions{})
if err != nil {
return false, err
}
func waitForEndpointsState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few var/func names that still use endpoint or endpoints, we could update those to slices?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth it - in the end we are looking at endpoints - whether that comes from corev1.Endpoints vs. discoveryv1.EndpointSlices shouldn't matter for these assertions

client kubernetes.Interface,
svcName,
svcNamespace string,
inState func([]discoveryv1.EndpointSlice) (bool, error),
) error {
return wait.PollUntilContextTimeout(
context.Background(),
test.PollInterval,
test.PollTimeout,
true,
func(context.Context) (bool, error) {
slices, err := test.EndpointSlicesForService(client, svcNamespace, svcName)
if err != nil {
return false, err
}

return inState(endpoint)
})
return inState(slices)
})
}

func readyEndpointsDoNotContain(ip string) func(*corev1.Endpoints) (bool, error) {
return func(eps *corev1.Endpoints) (bool, error) {
for _, subset := range eps.Subsets {
for _, ready := range subset.Addresses {
if ready.IP == ip {
return false, nil
func readyEndpointsDoNotContain(ip string) func([]discoveryv1.EndpointSlice) (bool, error) {
return func(epSlices []discoveryv1.EndpointSlice) (bool, error) {
contains := false
for _, slice := range epSlices {
for _, eps := range slice.Endpoints {
if eps.Conditions.Ready == nil || *eps.Conditions.Ready {
contains = contains || slices.Contains(eps.Addresses, ip)
}
}
}
return true, nil
return !contains, nil
}
}

Expand Down
Loading