Skip to content
202 changes: 149 additions & 53 deletions pkg/reconciler/route/reconcile_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"time"

"github.com/google/go-cmp/cmp"
Expand All @@ -43,69 +44,146 @@ import (
v1 "knative.dev/serving/pkg/apis/serving/v1"
"knative.dev/serving/pkg/reconciler/route/config"
"knative.dev/serving/pkg/reconciler/route/resources"
"knative.dev/serving/pkg/reconciler/route/resources/names"
resourcenames "knative.dev/serving/pkg/reconciler/route/resources/names"
"knative.dev/serving/pkg/reconciler/route/traffic"
)

func (c *Reconciler) reconcileIngress(
func (c *Reconciler) reconcileIngresses(
ctx context.Context, r *v1.Route, tc *traffic.Config,
tls []netv1alpha1.IngressTLS,
ingressClass string,
acmeChallenges ...netv1alpha1.HTTP01Challenge,
) (*netv1alpha1.Ingress, *traffic.Rollout, error) {
) ([]*netv1alpha1.Ingress, *traffic.Rollout, error) {
recorder := controller.GetEventRecorder(ctx)
var effectiveRO *traffic.Rollout

ingress, err := c.ingressLister.Ingresses(r.Namespace).Get(names.Ingress(r))
if apierrs.IsNotFound(err) {
desired, err := resources.MakeIngress(ctx, r, tc, tls, ingressClass, acmeChallenges...)
if err != nil {
return nil, nil, err
// Phase 1: Build the default ingress with rollout.
desiredNames := resources.DesiredIngressNames(r, tc)

// Collect existing ingresses and check readiness.
existingIngresses := map[string]*netv1alpha1.Ingress{}
allExistingReady := true
hasExisting := false

for name := range desiredNames {
existing, err := c.ingressLister.Ingresses(r.Namespace).Get(name)
if apierrs.IsNotFound(err) {
allExistingReady = false
continue
} else if err != nil {
return nil, nil, fmt.Errorf("failed to get Ingress %q: %w", name, err)
}
ingress, err = c.netclient.NetworkingV1alpha1().Ingresses(desired.Namespace).Create(ctx, desired, metav1.CreateOptions{})
if err != nil {
recorder.Eventf(r, corev1.EventTypeWarning, "CreationFailed", "Failed to create Ingress: %v", err)
return nil, nil, fmt.Errorf("failed to create Ingress: %w", err)
hasExisting = true
existingIngresses[name] = existing
if !existing.IsReady() {
allExistingReady = false
}
}

recorder.Eventf(r, corev1.EventTypeNormal, "Created", "Created Ingress %q", ingress.GetName())
return ingress, tc.BuildRollout(), nil
} else if err != nil {
return nil, nil, err
// Read previous rollout only from the default ingress.
// Rollout state is a concern of the default ingress only.
defaultName := resourcenames.TaggedIngress(r, traffic.DefaultTarget)
var prevRO *traffic.Rollout
if existingDefault, ok := existingIngresses[defaultName]; ok {
prevRO = deserializeRollout(ctx, existingDefault.Annotations[networking.RolloutAnnotationKey])
}

// Compute the effective rollout.
var effectiveRO *traffic.Rollout
if !hasExisting {
effectiveRO = tc.BuildRollout()
} else {
// Ingress exists. We need to compute the rollout spec diff.
effectiveRO = c.reconcileRollout(ctx, r, tc, ingress)
desired, err := resources.MakeIngressWithRollout(ctx, r, tc, effectiveRO,
tls, ingressClass, acmeChallenges...)
effectiveRO = c.reconcileRolloutFromIngresses(ctx, r, tc, prevRO, allExistingReady)
}

// Build default ingress with the effective rollout.
defaultIng, err := resources.MakeDefaultIngressWithRollout(ctx, r, tc, effectiveRO, tls, ingressClass, acmeChallenges...)
if err != nil {
return nil, nil, err
}
desired := []*netv1alpha1.Ingress{defaultIng}

// Phase 2: Build per-tag ingresses (no rollout annotation).
var tagNames []string
for tag := range tc.Targets {
if tag != traffic.DefaultTarget {
tagNames = append(tagNames, tag)
}
}
sort.Strings(tagNames)

for _, tag := range tagNames {
tagIng, err := resources.MakeRouteTagIngress(ctx, r, tc, tag, tls, ingressClass, acmeChallenges...)
if err != nil {
return nil, nil, err
}
desired = append(desired, tagIng)
}

if !equality.Semantic.DeepEqual(ingress.Spec, desired.Spec) ||
!equality.Semantic.DeepEqual(ingress.Annotations, desired.Annotations) ||
!equality.Semantic.DeepEqual(ingress.Labels, desired.Labels) {
// It is notable that one reason for differences here may be defaulting.
// When that is the case, the Update will end up being a nop because the
// webhook will bring them into alignment and no new reconciliation will occur.
// Also, compare annotation and label in case ingress.Class or parent route's labels
// is updated.

// Don't modify the informers copy.
origin := ingress.DeepCopy()
origin.Spec = desired.Spec
origin.Annotations = desired.Annotations
origin.Labels = desired.Labels

updated, err := c.netclient.NetworkingV1alpha1().Ingresses(origin.Namespace).Update(
ctx, origin, metav1.UpdateOptions{})
// Phase 3: Create or update each desired ingress, then clean up orphans.
var result []*netv1alpha1.Ingress
for _, d := range desired {
existing, ok := existingIngresses[d.Name]
if !ok {
created, err := c.netclient.NetworkingV1alpha1().Ingresses(d.Namespace).Create(ctx, d, metav1.CreateOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to update Ingress: %w", err)
recorder.Eventf(r, corev1.EventTypeWarning, "CreationFailed", "Failed to create Ingress: %v", err)
return nil, nil, fmt.Errorf("failed to create Ingress: %w", err)
}
recorder.Eventf(r, corev1.EventTypeNormal, "Created", "Created Ingress %q (tag: %s)", created.GetName(), ingressTagForEvent(d.Labels))
result = append(result, created)
} else {
if !equality.Semantic.DeepEqual(existing.Spec, d.Spec) ||
!equality.Semantic.DeepEqual(existing.Annotations, d.Annotations) ||
!equality.Semantic.DeepEqual(existing.Labels, d.Labels) {
origin := existing.DeepCopy()
origin.Spec = d.Spec
origin.Annotations = d.Annotations
origin.Labels = d.Labels

updated, err := c.netclient.NetworkingV1alpha1().Ingresses(origin.Namespace).Update(
ctx, origin, metav1.UpdateOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to update Ingress: %w", err)
}
result = append(result, updated)
} else {
result = append(result, existing)
}
return updated, effectiveRO, nil
}
}

return ingress, effectiveRO, err
// Delete orphaned ingresses (tags that no longer exist).
if err := c.deleteOrphanedIngresses(ctx, r, desiredNames); err != nil {
return nil, nil, err
}

return result, effectiveRO, nil
}

func (c *Reconciler) deleteOrphanedIngresses(ctx context.Context, r *v1.Route, desiredNames sets.Set[string]) error {
routeLabelSelector := labels.SelectorFromSet(labels.Set{serving.RouteLabelKey: r.Name})
allIngresses, err := c.ingressLister.Ingresses(r.Namespace).List(routeLabelSelector)
if err != nil {
return fmt.Errorf("failed to fetch existing ingresses: %w", err)
}

recorder := controller.GetEventRecorder(ctx)
for _, ing := range allIngresses {
if desiredNames.Has(ing.Name) {
continue
}
if !metav1.IsControlledBy(ing, r) {
continue
}
if err := c.netclient.NetworkingV1alpha1().Ingresses(r.Namespace).Delete(
ctx, ing.Name, metav1.DeleteOptions{}); err != nil && !apierrs.IsNotFound(err) {
recorder.Eventf(r, corev1.EventTypeWarning, "DeleteFailed",
"Failed to delete orphaned Ingress %q: %v", ing.Name, err)
return fmt.Errorf("failed to delete orphaned Ingress: %w", err)
}
recorder.Eventf(r, corev1.EventTypeNormal, "Deleted", "Deleted orphaned Ingress %q (tag: %s)", ing.Name, ingressTagForEvent(ing.Labels))
}
return nil
}

func (c *Reconciler) deleteOrphanedServices(ctx context.Context, r *v1.Route, activeServices []resources.ServicePair) error {
Expand Down Expand Up @@ -176,10 +254,10 @@ func (c *Reconciler) reconcilePlaceholderServices(ctx context.Context, route *v1
"Failed to create placeholder service %q: %v", desiredService.Name, err)
return nil, fmt.Errorf("failed to create placeholder service: %w", err)
}
logger.Info("Created service ", desiredService.Name)
logger.Infof("Created service %q", desiredService.Name)
recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created placeholder service %q", desiredService.Name)
} else if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get Service %q: %w", desiredService.Name, err)
} else if !metav1.IsControlledBy(service, route) {
// Surface an error in the route's status, and return an error.
route.Status.MarkServiceNotOwned(desiredService.Name)
Expand Down Expand Up @@ -215,13 +293,24 @@ func (c *Reconciler) reconcilePlaceholderServices(ctx context.Context, route *v1
return services, nil
}

func (c *Reconciler) updatePlaceholderServices(ctx context.Context, route *v1.Route, pairs []resources.ServicePair, ingress *netv1alpha1.Ingress) error {
func (c *Reconciler) updatePlaceholderServices(ctx context.Context, route *v1.Route, pairs []resources.ServicePair, ingresses []*netv1alpha1.Ingress) error {
logger := logging.FromContext(ctx)
ns := route.Namespace

ingressByTag := make(map[string]*netv1alpha1.Ingress, len(ingresses))
for _, ing := range ingresses {
tag := ing.Labels[networking.TagLabelKey]
ingressByTag[tag] = ing
}

eg, egCtx := errgroup.WithContext(ctx)
for _, from := range pairs {
eg.Go(func() error {
ingress, ok := ingressByTag[from.Tag]
if !ok {
logger.Warnw("No ingress found for tag, skipping placeholder update", zap.String("tag", from.Tag))
return nil
}
to, err := resources.MakeK8sService(egCtx, route, from.Tag, ingress, resources.IsClusterLocalService(from.Service))
if err != nil {
// Loadbalancer not ready, no need to update.
Expand Down Expand Up @@ -328,9 +417,16 @@ func deserializeRollout(ctx context.Context, ro string) *traffic.Rollout {
return r
}

func (c *Reconciler) reconcileRollout(
func ingressTagForEvent(labels map[string]string) string {
if tag := labels[networking.TagLabelKey]; tag != traffic.DefaultTarget {
return tag
}
return "default"
}

func (c *Reconciler) reconcileRolloutFromIngresses(
ctx context.Context, r *v1.Route, tc *traffic.Config,
ingress *netv1alpha1.Ingress,
prevRO *traffic.Rollout, allIngressesReady bool,
) *traffic.Rollout {
cfg := config.FromContext(ctx)

Expand All @@ -349,18 +445,18 @@ func (c *Reconciler) reconcileRollout(
logger := logging.FromContext(ctx).Desugar().With(
zap.Int("durationSecs", rd))
logger.Debug("Rollout is enabled. Stepping from previous state.")
// Get the previous rollout state from the annotation.
// If it's corrupt, inexistent, or otherwise incorrect,
// the prevRO will be just nil rollout.
prevRO := deserializeRollout(ctx,
ingress.Annotations[networking.RolloutAnnotationKey])

// prevRO was read from the default ingress annotation.
if prevRO == nil || len(prevRO.Configurations) == 0 {
prevRO = nil
}

// And recompute the rollout state.
now := c.clock.Now().UnixNano()

// Now check if the ingress status changed from not ready to ready.
// Now check if all ingresses transitioned from not ready to ready.
rtView := r.Status.GetCondition(v1.RouteConditionIngressReady)
if prevRO != nil && ingress.IsReady() && !rtView.IsTrue() {
if prevRO != nil && allIngressesReady && !rtView.IsTrue() {
logger.Debug("Observing Ingress not-ready to ready switch condition for rollout")
prevRO.ObserveReady(ctx, now, float64(rd))
}
Expand Down
Loading
Loading