Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/core/200-roles/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ rules:
- apiGroups: [""]
resources: ["namespaces/finalizers"] # finalizers are needed for the owner reference of the webhook
verbs: ["update"]
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "deployments/finalizers"] # finalizers are needed for the owner reference of the webhook
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
Expand Down
6 changes: 3 additions & 3 deletions pkg/autoscaler/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
"knative.dev/pkg/hash"
)

const prefix = "autoscaler-bucket"
const BucketPrefix = "autoscaler-bucket"

// IsBucketHost returns true if the given host is a host of a K8S Service
// of a bucket.
func IsBucketHost(host string) bool {
// Currently checking prefix is ok as only requests sent via bucket service
// have host with the prefix. Maybe use regexp for improvement.
return strings.HasPrefix(host, prefix)
return strings.HasPrefix(host, BucketPrefix)
}

// AutoscalerBucketName returns the name of the Autoscaler bucket with given `ordinal`
// and `total` bucket count.
func AutoscalerBucketName(ordinal, total uint32) string {
return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", prefix, ordinal, total))
return strings.ToLower(fmt.Sprintf("%s-%02d-of-%02d", BucketPrefix, ordinal, total))
}

// AutoscalerBucketSet returns a hash.BucketSet consisting of Autoscaler
Expand Down
73 changes: 44 additions & 29 deletions pkg/autoscaler/statforwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
"github.com/google/go-cmp/cmp"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
ktesting "k8s.io/client-go/testing"
"k8s.io/utils/ptr"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
fakeleaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease/fake"
fakeendpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake"
fakeserviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake"
fakeendpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake"
"knative.dev/pkg/hash"
rtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -89,7 +91,7 @@ func must(t *testing.T, err error) {
func TestForwarderReconcile(t *testing.T) {
ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t)
kubeClient := fakekubeclient.Get(ctx)
endpoints := fakeendpointsinformer.Get(ctx)
endpoints := fakeendpointsliceinformer.Get(ctx)
service := fakeserviceinformer.Get(ctx)
lease := fakeleaseinformer.Get(ctx)

Expand Down Expand Up @@ -124,33 +126,46 @@ func TestForwarderReconcile(t *testing.T) {
t.Fatal("Timeout to get the Service:", lastErr)
}

wantSubsets := []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: testIP1,
want := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: bucket1,
Namespace: testNs,
Labels: map[string]string{
discoveryv1.LabelServiceName: bucket1,
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{testIP1},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
Serving: ptr.To(true),
Terminating: ptr.To(false),
},
}},
Ports: []corev1.EndpointPort{{
Name: autoscalerPortName,
Port: autoscalerPort,
Protocol: corev1.ProtocolTCP,
Ports: []discoveryv1.EndpointPort{{
Name: ptr.To(autoscalerPortName),
Port: ptr.To[int32](autoscalerPort),
Protocol: ptr.To(corev1.ProtocolTCP),
}},
}}
}

// Check the endpoints got updated.
el := endpoints.Lister().Endpoints(testNs)
el := endpoints.Lister().EndpointSlices(testNs)
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
got, err := el.Get(bucket1)
if err != nil {
lastErr = err
return false, nil //nolint:nilerr
}

if !cmp.Equal(wantSubsets, got.Subsets) {
lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets)
if diff := cmp.Diff(want, got); diff != "" {
lastErr = fmt.Errorf("resulting EndpointSlice are different (-want, +got) %s", diff)
return false, nil
}
return true, nil
}); err != nil {
t.Fatal("Timeout to get the Endpoints:", lastErr)
t.Fatal("Timeout to get the EndpointSlice:", lastErr)
}

// Lease holder gets changed.
Expand All @@ -160,7 +175,7 @@ func TestForwarderReconcile(t *testing.T) {
lease.Informer().GetIndexer().Add(l)

// Check that the endpoints got updated.
wantSubsets[0].Addresses[0].IP = testIP2
want.Endpoints[0].Addresses[0] = testIP2
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// Check the endpoints get updated.
got, err := el.Get(bucket1)
Expand All @@ -169,13 +184,13 @@ func TestForwarderReconcile(t *testing.T) {
return false, nil //nolint:nilerr
}

if !cmp.Equal(wantSubsets, got.Subsets) {
lastErr = fmt.Errorf("Got Subsets = %v, want = %v", got.Subsets, wantSubsets)
if diff := cmp.Diff(want, got); diff != "" {
lastErr = fmt.Errorf("Got (-want,+got): %s", diff)
return false, nil
}
return true, nil
}); err != nil {
t.Fatal("Timeout to get the Endpoints:", lastErr)
t.Fatal("Timeout to get the EndpointSlice:", lastErr)
}
}

Expand Down Expand Up @@ -240,7 +255,7 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) {

endpointsCreation := 0
retried := make(chan struct{})
kubeClient.PrependReactor("create", "endpoints",
kubeClient.PrependReactor("create", "endpointslices",
func(action ktesting.Action) (bool, runtime.Object, error) {
endpointsCreation++
if endpointsCreation == 2 {
Expand All @@ -257,14 +272,14 @@ func TestForwarderRetryOnEndpointsCreationFailure(t *testing.T) {
select {
case <-retried:
case <-time.After(time.Second):
t.Error("Timeout waiting for Endpoints retry")
t.Error("Timeout waiting for EndpointSlice retry")
}
}

func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {
ctx, cancel, informers := rtesting.SetupFakeContextWithCancel(t)
kubeClient := fakekubeclient.Get(ctx)
endpoints := fakeendpointsinformer.Get(ctx)
endpoints := fakeendpointsliceinformer.Get(ctx)
lease := fakeleaseinformer.Get(ctx)

waitInformers, err := rtesting.RunAndSyncInformers(ctx, informers...)
Expand All @@ -282,7 +297,7 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {

endpointsUpdate := 0
retried := make(chan struct{})
kubeClient.PrependReactor("update", "endpoints",
kubeClient.PrependReactor("update", "endpointslices",
func(action ktesting.Action) (bool, runtime.Object, error) {
endpointsUpdate++
if endpointsUpdate == 2 {
Expand All @@ -293,21 +308,21 @@ func TestForwarderRetryOnEndpointsUpdateFailure(t *testing.T) {
},
)

e := &corev1.Endpoints{
e := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: bucket1,
Namespace: testNs,
},
}
kubeClient.CoreV1().Endpoints(testNs).Create(ctx, e, metav1.CreateOptions{})
kubeClient.DiscoveryV1().EndpointSlices(testNs).Create(ctx, e, metav1.CreateOptions{})
endpoints.Informer().GetIndexer().Add(e)
kubeClient.CoordinationV1().Leases(testNs).Create(ctx, testLease, metav1.CreateOptions{})
lease.Informer().GetIndexer().Add(testLease)

select {
case <-retried:
case <-time.After(time.Second):
t.Error("Timeout waiting for Endpoints retry")
t.Error("Timeout waiting for EndpointSlice retry")
}
}

Expand Down Expand Up @@ -336,10 +351,10 @@ func TestForwarderSkipReconciling(t *testing.T) {
return false, nil, nil
},
)
endpointsCreated := make(chan struct{})
kubeClient.PrependReactor("create", "endpoints",
endpointsliceCreated := make(chan struct{})
kubeClient.PrependReactor("create", "endpointslices",
func(action ktesting.Action) (bool, runtime.Object, error) {
endpointsCreated <- struct{}{}
endpointsliceCreated <- struct{}{}
return false, nil, nil
},
)
Expand Down Expand Up @@ -391,7 +406,7 @@ func TestForwarderSkipReconciling(t *testing.T) {
case <-time.After(50 * time.Millisecond):
}
select {
case <-endpointsCreated:
case <-endpointsliceCreated:
t.Error("Got Endpoints created, want no actions")
case <-time.After(50 * time.Millisecond):
}
Expand Down
62 changes: 35 additions & 27 deletions pkg/autoscaler/statforwarder/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import (
"go.uber.org/zap"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
discoveryv1listers "k8s.io/client-go/listers/discovery/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
kubeclient "knative.dev/pkg/client/injection/kube/client"
leaseinformer "knative.dev/pkg/client/injection/kube/informers/coordination/v1/lease"
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
endpointsliceinformer "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice"
"knative.dev/pkg/controller"
"knative.dev/pkg/hash"
"knative.dev/pkg/logging"
Expand All @@ -50,13 +52,12 @@ func LeaseBasedProcessor(ctx context.Context, f *Forwarder, accept statProcessor
if err != nil {
return err
}
endpointsInformer := endpointsinformer.Get(ctx)
lt := &leaseTracker{
logger: logging.FromContext(ctx),
selfIP: selfIP,
bs: f.bs,
kc: kubeclient.Get(ctx),
endpointsLister: endpointsInformer.Lister(),
endpointsLister: endpointsliceinformer.Get(ctx).Lister(),
id2ip: make(map[string]string),
accept: accept,
fwd: f,
Expand Down Expand Up @@ -86,7 +87,7 @@ type leaseTracker struct {
bs *hash.BucketSet

kc kubernetes.Interface
endpointsLister corev1listers.EndpointsLister
endpointsLister discoveryv1listers.EndpointSliceLister

// id2ip stores the IP extracted from the holder identity to avoid
// string split each time.
Expand Down Expand Up @@ -232,25 +233,38 @@ func (f *leaseTracker) createService(ctx context.Context, ns, n string) error {
return nil
}

// createOrUpdateEndpoints creates an Endpoints object with the given namespace and
// name, and the Forwarder.selfIP. If the Endpoints object already
// createOrUpdateEndpoints creates an EndpointSlice object with the given namespace and
// name, and the Forwarder.selfIP. If the EndpointSlice object already
// exists, it will update the Endpoints with the Forwarder.selfIP.
func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string) error {
wantSubsets := []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: f.selfIP,
want := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: ns,
Labels: map[string]string{
discoveryv1.LabelServiceName: n,
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{f.selfIP},
Conditions: discoveryv1.EndpointConditions{
Ready: ptr.To(true),
Serving: ptr.To(true),
Terminating: ptr.To(false),
},
}},
Ports: []corev1.EndpointPort{{
Name: autoscalerPortName,
Port: autoscalerPort,
Protocol: corev1.ProtocolTCP,
Ports: []discoveryv1.EndpointPort{{
Name: ptr.To(autoscalerPortName),
Port: ptr.To[int32](autoscalerPort),
Protocol: ptr.To(corev1.ProtocolTCP),
}},
}}
}

exists := true
var lastErr error
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
e, err := f.endpointsLister.Endpoints(ns).Get(n)
got, err := f.endpointsLister.EndpointSlices(ns).Get(n)
if apierrs.IsNotFound(err) {
exists = false
return true, nil
Expand All @@ -262,13 +276,13 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
return false, nil //nolint:nilerr
}

if equality.Semantic.DeepEqual(wantSubsets, e.Subsets) {
if equality.Semantic.DeepEqual(want.Endpoints, got.Endpoints) {
return true, nil
}

want := e.DeepCopy()
want.Subsets = wantSubsets
if _, lastErr = f.kc.CoreV1().Endpoints(ns).Update(ctx, want, metav1.UpdateOptions{}); lastErr != nil {
e := got.DeepCopy()
e.Endpoints = want.Endpoints
if _, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Update(ctx, e, metav1.UpdateOptions{}); lastErr != nil {
// Do not return the error to cause a retry.
return false, nil //nolint:nilerr
}
Expand All @@ -284,13 +298,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
}

if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Namespace: ns,
},
Subsets: wantSubsets,
}, metav1.CreateOptions{})
_, lastErr = f.kc.DiscoveryV1().EndpointSlices(ns).Create(ctx, want, metav1.CreateOptions{})
// Do not return the error to cause a retry.
return lastErr == nil, nil
}); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/cleanup/cmd/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/pkg/environment"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"
autoscalerbucket "knative.dev/serving/pkg/autoscaler/bucket"
)

const (
Expand Down Expand Up @@ -106,6 +107,21 @@ func main() {
if err = client.RbacV1().ClusterRoles().Delete(context.Background(), "knative-serving-certmanager", metav1.DeleteOptions{}); err != nil && !apierrs.IsNotFound(err) {
logger.Fatal("failed to delete clusterrole knative-serving-certmanager: ", err)
}

epList, err := client.CoreV1().Endpoints(system.Namespace()).List(context.Background(), metav1.ListOptions{})
if err != nil {
logger.Fatal("failed to fetch endpoints: ", err)
}

for _, eps := range epList.Items {
if strings.HasPrefix(eps.GetName(), autoscalerbucket.BucketPrefix) {
err := client.CoreV1().Endpoints(system.Namespace()).Delete(context.Background(), eps.GetName(), metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
logger.Fatal("failed to delete autoscaler endpoints: ", err)
}
}
}

logger.Info("Old Serving resource deletion completed successfully")
}

Expand Down
Loading
Loading