diff --git a/config/core/200-roles/clusterrole.yaml b/config/core/200-roles/clusterrole.yaml index 5ab0e53829ff..22081b446e33 100644 --- a/config/core/200-roles/clusterrole.yaml +++ b/config/core/200-roles/clusterrole.yaml @@ -30,6 +30,9 @@ rules: - apiGroups: [""] resources: ["namespaces/finalizers"] # finalizers are needed for the owner reference of the webhook verbs: ["update"] + - apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["get", "list", "create", "update", "delete", "patch", "watch"] - apiGroups: ["apps"] resources: ["deployments", "deployments/finalizers"] # finalizers are needed for the owner reference of the webhook verbs: ["get", "list", "create", "update", "delete", "patch", "watch"] diff --git a/pkg/autoscaler/bucket/bucket.go b/pkg/autoscaler/bucket/bucket.go index 37fb5f0b58f6..8a1ca70546dd 100644 --- a/pkg/autoscaler/bucket/bucket.go +++ b/pkg/autoscaler/bucket/bucket.go @@ -26,20 +26,20 @@ import ( "knative.dev/pkg/hash" ) -const prefix = "autoscaler-bucket" +const BucketPrefix = "autoscaler-bucket" // IsBucketHost returns true if the given host is a host of a K8S Service // of a bucket. func IsBucketHost(host string) bool { // Currently checking prefix is ok as only requests sent via bucket service // have host with the prefix. Maybe use regexp for improvement. - return strings.HasPrefix(host, prefix) + return strings.HasPrefix(host, BucketPrefix) } // AutoscalerBucketName returns the name of the Autoscaler bucket with given `ordinal` // and `total` bucket count. func AutoscalerBucketName(ordinal, total uint32) string { - return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", prefix, ordinal, total)) + return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", BucketPrefix, ordinal, total)) } // AutoscalerBucketSet returns a hash.BucketSet consisting of Autoscaler diff --git a/pkg/autoscaler/statforwarder/forwarder_test.go b/pkg/autoscaler/statforwarder/forwarder_test.go index 1a8bb8212509..39743fa4e63a 100644 --- a/pkg/autoscaler/statforwarder/forwarder_test.go +++ b/pkg/autoscaler/statforwarder/forwarder_test.go @@ -26,16 +26,18 @@ import ( "github.com/google/go-cmp/cmp" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" ktesting "k8s.io/client-go/testing" + "k8s.io/utils/ptr" fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" fakeleaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease/fake" - fakeendpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake" fakeserviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" + fakeendpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake" "knative.dev/pkg/hash" rtesting "knative.dev/pkg/reconciler/testing" "knative.dev/pkg/system" @@ -89,7 +91,7 @@ func must(t *testing.T, err error) { func TestForwarderReconcile(t *testing.T) { ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t) kubeClient := fakekubeclient.Get(ctx) - endpoints := fakeendpointsinformer.Get(ctx) + endpoints := fakeendpointsliceinformer.Get(ctx) service := fakeserviceinformer.Get(ctx) lease := fakeleaseinformer.Get(ctx) @@ -124,19 +126,32 @@ func TestForwarderReconcile(t *testing.T) { t.Fatal("Timeout to get the Service:", lastErr) } - wantSubsets := []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: testIP1, + want := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: bucket1, + Namespace: testNs, + Labels: map[string]string{ + discoveryv1.LabelServiceName: bucket1, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{testIP1}, + Conditions: discoveryv1.EndpointConditions{ + Ready: ptr.To(true), + Serving: ptr.To(true), + Terminating: ptr.To(false), + }, }}, - Ports: []corev1.EndpointPort{{ - Name: autoscalerPortName, - Port: autoscalerPort, - Protocol: corev1.ProtocolTCP, + Ports: []discoveryv1.EndpointPort{{ + Name: ptr.To(autoscalerPortName), + Port: ptr.To[int32](autoscalerPort), + Protocol: ptr.To(corev1.ProtocolTCP), }}, - }} + } // Check the endpoints got updated. - el := endpoints.Lister().Endpoints(testNs) + el := endpoints.Lister().EndpointSlices(testNs) if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) { got, err := el.Get(bucket1) if err != nil { @@ -144,13 +159,13 @@ func TestForwarderReconcile(t *testing.T) { return false, nil //nolint:nilerr } - if !cmp.Equal(wantSubsets, got.Subsets) { - lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets) + if diff := cmp.Diff(want, got); diff != "" { + lastErr = fmt.Errorf("resulting EndpointSlice are different (-want, +got) %s", diff) return false, nil } return true, nil }); err != nil { - t.Fatal("Timeout to get the Endpoints:", lastErr) + t.Fatal("Timeout to get the EndpointSlice:", lastErr) } // Lease holder gets changed. @@ -160,7 +175,7 @@ func TestForwarderReconcile(t *testing.T) { lease.Informer().GetIndexer().Add(l) // Check that the endpoints got updated. - wantSubsets[0].Addresses[0].IP = testIP2 + want.Endpoints[0].Addresses[0] = testIP2 if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) { // Check the endpoints get updated. got, err := el.Get(bucket1) @@ -169,13 +184,13 @@ func TestForwarderReconcile(t *testing.T) { return false, nil //nolint:nilerr } - if !cmp.Equal(wantSubsets, got.Subsets) { - lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets) + if diff := cmp.Diff(want, got); diff != "" { + lastErr = fmt.Errorf("Got (-want,+got): %s", diff) return false, nil } return true, nil }); err != nil { - t.Fatal("Timeout to get the Endpoints:", lastErr) + t.Fatal("Timeout to get the EndpointSlice:", lastErr) } } @@ -240,7 +255,7 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) { endpointsCreation := 0 retried := make(chan struct{}) - kubeClient.PrependReactor("create", "endpoints", + kubeClient.PrependReactor("create", "endpointslices", func(action ktesting.Action) (bool, runtime.Object, error) { endpointsCreation++ if endpointsCreation == 2 { @@ -257,14 +272,14 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) { select { case <-retried: case <-time.After(time.Second): - t.Error("Timeout waiting for Endpoints retry") + t.Error("Timeout waiting for EndpointSlice retry") } } func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) { ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t) kubeClient := fakekubeclient.Get(ctx) - endpoints := fakeendpointsinformer.Get(ctx) + endpoints := fakeendpointsliceinformer.Get(ctx) lease := fakeleaseinformer.Get(ctx) waitInformers, err := rtesting.RunAndSyncInformers(ctx, informers...) @@ -282,7 +297,7 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) { endpointsUpdate := 0 retried := make(chan struct{}) - kubeClient.PrependReactor("update", "endpoints", + kubeClient.PrependReactor("update", "endpointslices", func(action ktesting.Action) (bool, runtime.Object, error) { endpointsUpdate++ if endpointsUpdate == 2 { @@ -293,13 +308,13 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) { }, ) - e := &corev1.Endpoints{ + e := &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Name: bucket1, Namespace: testNs, }, } - kubeClient.CoreV1().Endpoints(testNs).Create(ctx, e, metav1.CreateOptions{}) + kubeClient.DiscoveryV1().EndpointSlices(testNs).Create(ctx, e, metav1.CreateOptions{}) endpoints.Informer().GetIndexer().Add(e) kubeClient.CoordinationV1().Leases(testNs).Create(ctx, testLease, metav1.CreateOptions{}) lease.Informer().GetIndexer().Add(testLease) @@ -307,7 +322,7 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) { select { case <-retried: case <-time.After(time.Second): - t.Error("Timeout waiting for Endpoints retry") + t.Error("Timeout waiting for EndpointSlice retry") } } @@ -336,10 +351,10 @@ func TestForwarderSkipReconciling(t *testing.T) { return false, nil, nil }, ) - endpointsCreated := make(chan struct{}) - kubeClient.PrependReactor("create", "endpoints", + endpointsliceCreated := make(chan struct{}) + kubeClient.PrependReactor("create", "endpointslices", func(action ktesting.Action) (bool, runtime.Object, error) { - endpointsCreated <- struct{}{} + endpointsliceCreated <- struct{}{} return false, nil, nil }, ) @@ -391,7 +406,7 @@ func TestForwarderSkipReconciling(t *testing.T) { case <-time.After(50 * time.Millisecond): } select { - case <-endpointsCreated: + case <-endpointsliceCreated: t.Error("Got Endpoints created, want no actions") case <-time.After(50 * time.Millisecond): } diff --git a/pkg/autoscaler/statforwarder/leases.go b/pkg/autoscaler/statforwarder/leases.go index 1806ffeae255..529b68f1a0c8 100644 --- a/pkg/autoscaler/statforwarder/leases.go +++ b/pkg/autoscaler/statforwarder/leases.go @@ -25,17 +25,19 @@ import ( "go.uber.org/zap" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" kubeclient "knative.dev/pkg/client/injection/kube/client" leaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease" - endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" + endpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice" "knative.dev/pkg/controller" "knative.dev/pkg/hash" "knative.dev/pkg/logging" @@ -50,13 +52,12 @@ func LeaseBasedProcessor(ctx context.Context, f *Forwarder, accept statProcessor if err != nil { return err } - endpointsInformer := endpointsinformer.Get(ctx) lt := &leaseTracker{ logger: logging.FromContext(ctx), selfIP: selfIP, bs: f.bs, kc: kubeclient.Get(ctx), - endpointsLister: endpointsInformer.Lister(), + endpointsLister: endpointsliceinformer.Get(ctx).Lister(), id2ip: make(map[string]string), accept: accept, fwd: f, @@ -86,7 +87,7 @@ type leaseTracker struct { bs *hash.BucketSet kc kubernetes.Interface - endpointsLister corev1listers.EndpointsLister + endpointsLister discoveryv1listers.EndpointSliceLister // id2ip stores the IP extracted from the holder identity to avoid // string split each time. @@ -232,25 +233,38 @@ func (f *leaseTracker) createService(ctx context.Context, ns, n string) error { return nil } -// createOrUpdateEndpoints creates an Endpoints object with the given namespace and -// name, and the Forwarder.selfIP. If the Endpoints object already +// createOrUpdateEndpoints creates an EndpointSlice object with the given namespace and +// name, and the Forwarder.selfIP. If the EndpointSlice object already // exists, it will update the Endpoints with the Forwarder.selfIP. func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string) error { - wantSubsets := []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: f.selfIP, + want := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: n, + Namespace: ns, + Labels: map[string]string{ + discoveryv1.LabelServiceName: n, + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{f.selfIP}, + Conditions: discoveryv1.EndpointConditions{ + Ready: ptr.To(true), + Serving: ptr.To(true), + Terminating: ptr.To(false), + }, }}, - Ports: []corev1.EndpointPort{{ - Name: autoscalerPortName, - Port: autoscalerPort, - Protocol: corev1.ProtocolTCP, + Ports: []discoveryv1.EndpointPort{{ + Name: ptr.To(autoscalerPortName), + Port: ptr.To[int32](autoscalerPort), + Protocol: ptr.To(corev1.ProtocolTCP), }}, - }} + } exists := true var lastErr error if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) { - e, err := f.endpointsLister.Endpoints(ns).Get(n) + got, err := f.endpointsLister.EndpointSlices(ns).Get(n) if apierrs.IsNotFound(err) { exists = false return true, nil @@ -262,13 +276,13 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string return false, nil //nolint:nilerr } - if equality.Semantic.DeepEqual(wantSubsets, e.Subsets) { + if equality.Semantic.DeepEqual(want.Endpoints, got.Endpoints) { return true, nil } - want := e.DeepCopy() - want.Subsets = wantSubsets - if _, lastErr = f.kc.CoreV1().Endpoints(ns).Update(ctx, want, metav1.UpdateOptions{}); lastErr != nil { + e := got.DeepCopy() + e.Endpoints = want.Endpoints + if _, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Update(ctx, e, metav1.UpdateOptions{}); lastErr != nil { // Do not return the error to cause a retry. return false, nil //nolint:nilerr } @@ -284,13 +298,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string } if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) { - _, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: n, - Namespace: ns, - }, - Subsets: wantSubsets, - }, metav1.CreateOptions{}) + _, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Create(ctx, want, metav1.CreateOptions{}) // Do not return the error to cause a retry. return lastErr == nil, nil }); err != nil { diff --git a/pkg/cleanup/cmd/cleanup/cleanup.go b/pkg/cleanup/cmd/cleanup/cleanup.go index b2d1a02052c5..af96f382b233 100644 --- a/pkg/cleanup/cmd/cleanup/cleanup.go +++ b/pkg/cleanup/cmd/cleanup/cleanup.go @@ -30,6 +30,7 @@ import ( "knative.dev/pkg/environment" "knative.dev/pkg/logging" "knative.dev/pkg/system" + autoscalerbucket "knative.dev/serving/pkg/autoscaler/bucket" ) const ( @@ -106,6 +107,21 @@ func main() { if err = client.RbacV1().ClusterRoles().Delete(context.Background(), "knative-serving-certmanager", metav1.DeleteOptions{}); err != nil && !apierrs.IsNotFound(err) { logger.Fatal("failed to delete clusterrole knative-serving-certmanager: ", err) } + + epList, err := client.CoreV1().Endpoints(system.Namespace()).List(context.Background(), metav1.ListOptions{}) + if err != nil { + logger.Fatal("failed to fetch endpoints: ", err) + } + + for _, eps := range epList.Items { + if strings.HasPrefix(eps.GetName(), autoscalerbucket.BucketPrefix) { + err := client.CoreV1().Endpoints(system.Namespace()).Delete(context.Background(), eps.GetName(), metav1.DeleteOptions{}) + if err != nil && !apierrs.IsNotFound(err) { + logger.Fatal("failed to delete autoscaler endpoints: ", err) + } + } + } + logger.Info("Old Serving resource deletion completed successfully") } diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go new file mode 100644 index 000000000000..443ea622b12b --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package endpointslice + +import ( + context "context" + + v1 "k8s.io/client-go/informers/discovery/v1" + factory "knative.dev/pkg/client/injection/kube/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Discovery().V1().EndpointSlices() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1.EndpointSliceInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch k8s.io/client-go/informers/discovery/v1.EndpointSliceInformer from context.") + } + return untyped.(v1.EndpointSliceInformer) +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go new file mode 100644 index 000000000000..b7b693a02a36 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + endpointslice "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice" + fake "knative.dev/pkg/client/injection/kube/informers/factory/fake" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = endpointslice.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Discovery().V1().EndpointSlices() + return context.WithValue(ctx, endpointslice.Key{}, inf), inf.Informer() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4d4fad9a8d9c..1df0ba113f62 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1578,6 +1578,8 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/secret knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake knative.dev/pkg/client/injection/kube/informers/core/v1/service knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake +knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice +knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake knative.dev/pkg/client/injection/kube/informers/factory knative.dev/pkg/client/injection/kube/informers/factory/fake knative.dev/pkg/client/injection/kube/informers/factory/filtered