From 3d594cc4b335e85c8ea754fc4613cdd6909e0b0f Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 10 Dec 2025 11:04:15 +1100 Subject: [PATCH 1/2] fix: always sort bindings in the scheduler to ensure deterministic decision list output (#361) --- pkg/scheduler/framework/framework.go | 19 +- pkg/scheduler/framework/framework_test.go | 180 +++++++++++++++++- pkg/utils/controller/binding_resolver.go | 15 +- pkg/utils/controller/binding_resolver_test.go | 86 +++++++++ 4 files changed, 295 insertions(+), 5 deletions(-) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index f276d6f00..b75f5d7f2 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -607,7 +607,7 @@ type filteredClusterWithStatus struct { status *Status } -// helper type to pretty print a list of filteredClusterWithStatus +// filteredClusterWithStatusList is a list of filteredClusterWithStatus. type filteredClusterWithStatusList []*filteredClusterWithStatus func (cs filteredClusterWithStatusList) String() string { @@ -621,6 +621,15 @@ func (cs filteredClusterWithStatusList) String() string { return fmt.Sprintf("filteredClusters[%s]", strings.Join(filteredClusters, ", ")) } +// Implement sort.Interface for filteredClusterWithStatusList. +func (f filteredClusterWithStatusList) Len() int { return len(f) } +func (f filteredClusterWithStatusList) Less(i, j int) bool { + return f[i].cluster.Name < f[j].cluster.Name +} +func (f filteredClusterWithStatusList) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} + // runFilterPlugins runs filter plugins on clusters in parallel. func (f *framework) runFilterPlugins(ctx context.Context, state *CycleState, policy placementv1beta1.PolicySnapshotObj, clusters []clusterv1beta1.MemberCluster) (passed []*clusterv1beta1.MemberCluster, filtered filteredClusterWithStatusList, err error) { // Create a child context. @@ -787,6 +796,14 @@ func (f *framework) updatePolicySnapshotStatusFromBindings( return controller.NewUnexpectedBehaviorError(err) } + // Sort all filtered clusters. + // + // This step is needed to produce deterministic decision outputs. If there are enough slots, + // the scheduler will try to explain why some clusters are filtered out in the decision list; to ensure + // that the list will not change across scheduling cycles without actual scheduling policy + // refreshes, the filtered clusters need to be sorted. + sort.Sort(filteredClusterWithStatusList(filtered)) + // Prepare new scheduling decisions. newDecisions := newSchedulingDecisionsFromBindings(f.maxUnselectedClusterDecisionCount, notPicked, filtered, existing...) // Prepare new scheduling condition. diff --git a/pkg/scheduler/framework/framework_test.go b/pkg/scheduler/framework/framework_test.go index 53c493eff..ec156568d 100644 --- a/pkg/scheduler/framework/framework_test.go +++ b/pkg/scheduler/framework/framework_test.go @@ -23,9 +23,11 @@ import ( "log" "os" "strings" + "sync/atomic" "testing" "time" + crossplanetest "github.com/crossplane/crossplane-runtime/v2/pkg/test" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -2734,7 +2736,7 @@ func TestUpdatePolicySnapshotStatusFromBindings(t *testing.T) { { cluster: &clusterv1beta1.MemberCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: altClusterName, + Name: anotherClusterName, }, }, status: filteredStatus, @@ -2742,7 +2744,7 @@ func TestUpdatePolicySnapshotStatusFromBindings(t *testing.T) { { cluster: &clusterv1beta1.MemberCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: anotherClusterName, + Name: altClusterName, }, }, status: filteredStatus, @@ -2760,7 +2762,7 @@ func TestUpdatePolicySnapshotStatusFromBindings(t *testing.T) { Reason: fmt.Sprintf(resourceScheduleSucceededWithScoreMessageFormat, clusterName, affinityScore1, topologySpreadScore1), }, { - ClusterName: altClusterName, + ClusterName: anotherClusterName, Selected: false, Reason: filteredStatus.String(), }, @@ -6536,3 +6538,175 @@ func TestUpdatePolicySnapshotStatusForPickFixedPlacementType(t *testing.T) { }) } } + +// TestRunSchedulingCycleForPickAllPlacementType_StableStatusOutputInLargeFleet tests the +// runSchedulingCycleForPickAllPlacementType method, specifically to ensure that the status output +// remains consistent when running the scheduling cycle in a large fleet (i.e., the scheduler +// will not constantly refresh the status across scheduling cycles). +func TestRunSchedulingCycleForPickAllPlacementType_StableStatusOutputInLargeFleet(t *testing.T) { + ctx := context.Background() + + // Set up the scheduler profile with a label-based dummy filter plugin. + profile := NewProfile("TestOnly") + + dummyLabelBasedFilterPluginName := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0) + wantLabelKey := "pre-selected" + wantLabelValue := "true" + wantLabels := map[string]string{ + wantLabelKey: wantLabelValue, + } + dummyLabelBasedFilterPlugin := &DummyAllPurposePlugin{ + name: dummyLabelBasedFilterPluginName, + filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy placementv1beta1.PolicySnapshotObj, cluster *clusterv1beta1.MemberCluster) (status *Status) { + memberClusterLabels := cluster.GetLabels() + for wk, wv := range wantLabels { + if v, ok := memberClusterLabels[wk]; !ok || v != wv { + return NewNonErrorStatus(ClusterUnschedulable, dummyLabelBasedFilterPluginName) + } + } + return nil + }, + } + profile.WithFilterPlugin(dummyLabelBasedFilterPlugin) + + mockClientStatusUpdateCount := atomic.Int32{} + mockClient := crossplanetest.MockClient{ + MockCreate: func(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return nil + }, + MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { + _ = mockClientStatusUpdateCount.Add(1) + return nil + }, + } + + f := &framework{ + profile: profile, + client: &mockClient, + uncachedReader: &mockClient, + manager: nil, + eventRecorder: nil, + parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), + maxUnselectedClusterDecisionCount: 3, + // The cluster eligibility checker is not invoked in this test spec. + clusterEligibilityChecker: clustereligibilitychecker.New(), + } + // No need to set up plugins with the framework. + + clusters := []clusterv1beta1.MemberCluster{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 1), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 2), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 3), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 4), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 5), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 6), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 7), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 8), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(clusterNameTemplate, 9), + }, + }, + } + state := NewCycleState(clusters, nil, nil) + placementKey := queue.PlacementKey(crpName) + wantClusterUnschedulableReason := "ClusterUnschedulable" + policy := &placementv1beta1.ClusterSchedulingPolicySnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: policyName, + Annotations: map[string]string{ + placementv1beta1.CRPGenerationAnnotation: "0", + }, + }, + Spec: placementv1beta1.SchedulingPolicySnapshotSpec{ + Policy: &placementv1beta1.PlacementPolicy{ + PlacementType: placementv1beta1.PickAllPlacementType, + Affinity: &placementv1beta1.Affinity{ + ClusterAffinity: &placementv1beta1.ClusterAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &placementv1beta1.ClusterSelector{ + ClusterSelectorTerms: []placementv1beta1.ClusterSelectorTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + wantLabelKey: wantLabelValue, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Status: placementv1beta1.SchedulingPolicySnapshotStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.PolicySnapshotScheduled), + Status: metav1.ConditionTrue, + Reason: FullyScheduledReason, + Message: fmt.Sprintf(fullyScheduledMessage, 1), + }, + }, + ObservedCRPGeneration: 0, + ClusterDecisions: []placementv1beta1.ClusterDecision{ + { + ClusterName: fmt.Sprintf(clusterNameTemplate, 1), + Reason: wantClusterUnschedulableReason, + }, + { + ClusterName: fmt.Sprintf(clusterNameTemplate, 2), + Reason: wantClusterUnschedulableReason, + }, + { + ClusterName: fmt.Sprintf(clusterNameTemplate, 3), + Reason: wantClusterUnschedulableReason, + }, + }, + }, + } + + // Simulate 100 consecutive scheduling cycles. + for i := 0; i < 100; i++ { + _, err := f.runSchedulingCycleForPickAllPlacementType(ctx, state, placementKey, policy, clusters, nil, nil, nil, nil) + if err != nil { + t.Fatalf("runSchedulingCycleForPickAllPlacementType() = %v, want no error", err) + } + } + + // Check if any status update was attempted; all should be skipped as there is no status change. + if mockClientStatusUpdateCount.Load() != 0 { + t.Errorf("runSchedulingCycleForPickAllPlacementType() status update attempt count = %d, want 0", mockClientStatusUpdateCount.Load()) + } +} diff --git a/pkg/utils/controller/binding_resolver.go b/pkg/utils/controller/binding_resolver.go index 08c74af9c..1d4473ff7 100644 --- a/pkg/utils/controller/binding_resolver.go +++ b/pkg/utils/controller/binding_resolver.go @@ -18,6 +18,7 @@ package controller import ( "context" + "sort" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -67,7 +68,19 @@ func ListBindingsFromKey(ctx context.Context, c client.Reader, placementKey type return nil, NewAPIServerError(false, err) } - return bindingList.GetBindingObjs(), nil + bindingObjs := bindingList.GetBindingObjs() + + // Sort the list of bindings. + // + // This is needed to ensure deterministic decision output from the scheduler. + sort.Slice(bindingObjs, func(i, j int) bool { + A, B := bindingObjs[i], bindingObjs[j] + // Sort the bindings only by their names; for ClusterResourceBindings, their namespaces are always empty; + // for ResourceBindings, in this case they all come from the same namespace. + return A.GetName() < B.GetName() + }) + + return bindingObjs, nil } // ConvertCRBObjsToBindingObjs converts a slice of ClusterResourceBinding items to BindingObj array. diff --git a/pkg/utils/controller/binding_resolver_test.go b/pkg/utils/controller/binding_resolver_test.go index c97dee2e4..354d1d1ee 100644 --- a/pkg/utils/controller/binding_resolver_test.go +++ b/pkg/utils/controller/binding_resolver_test.go @@ -20,10 +20,13 @@ import ( "context" "errors" "fmt" + "sync/atomic" "testing" + crossplanetest "github.com/crossplane/crossplane-runtime/v2/pkg/test" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -33,6 +36,12 @@ import ( placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" ) +const ( + bindingName1 = "binding-1" + bindingName2 = "binding-2" + bindingName3 = "binding-3" +) + func TestListBindingsFromKey(t *testing.T) { ctx := context.Background() @@ -428,6 +437,83 @@ func TestListBindingsFromKey(t *testing.T) { } } +// TestListBindingsFromKey_Sorted verifies that the returned bindings are always sorted by their names. +func TestListBindingsFromKey_Sorted(t *testing.T) { + ctx := context.Background() + + // Set a mode variable to control the behavior of list ops. + mockMode := atomic.Int32{} + mockMode.Store(0) + + // Use the mock client from the crossplane package rather than the commonly used fake.Client to + // better manipulate the list op results. + mockClient := crossplanetest.MockClient{ + MockList: func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + mode := mockMode.Load() + switch mode { + case 0: + if err := meta.SetList(list, []runtime.Object{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName1, + }, + }, + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName2, + }, + }, + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName3, + }, + }, + }); err != nil { + return fmt.Errorf("cannot set list results: %w", err) + } + case 1: + if err := meta.SetList(list, []runtime.Object{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName3, + }, + }, + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName2, + }, + }, + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: bindingName1, + }, + }, + }); err != nil { + return fmt.Errorf("cannot set list results: %w", err) + } + default: + return fmt.Errorf("unexpected mock mode: %d", mode) + } + return nil + }, + } + + bindingsInMode0, err := ListBindingsFromKey(ctx, &mockClient, types.NamespacedName{Name: "placeholder"}) + if err != nil { + t.Fatalf("ListBindingsFromKey() in mode 0 returned error: %v", err) + } + + mockMode.Store(1) + bindingsInMode1, err := ListBindingsFromKey(ctx, &mockClient, types.NamespacedName{Name: "placeholder"}) + if err != nil { + t.Fatalf("ListBindingsFromKey() in mode 1 returned error: %v", err) + } + + if diff := cmp.Diff(bindingsInMode0, bindingsInMode1); diff != "" { + t.Errorf("ListBindingsFromKey() returned different results in different modes (-mode0, +mode1):\n%s", diff) + } +} + func TestListBindingsFromKey_ClientError(t *testing.T) { ctx := context.Background() From 0552f2ebf942f4bdc10479d191beac1de740bc47 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 10 Dec 2025 11:05:16 +1100 Subject: [PATCH 2/2] feat: add status back-reporting support (1/, work applier side) (#327) --- .../workapplier/backoff_integration_test.go | 4 +- pkg/controllers/workapplier/controller.go | 3 + .../controller_integration_test.go | 456 ++++++++++++++++-- pkg/controllers/workapplier/status.go | 67 ++- pkg/controllers/workapplier/status_test.go | 168 +++++++ .../workapplier/waves_integration_test.go | 8 +- 6 files changed, 664 insertions(+), 42 deletions(-) diff --git a/pkg/controllers/workapplier/backoff_integration_test.go b/pkg/controllers/workapplier/backoff_integration_test.go index cdc857298..36571cc50 100644 --- a/pkg/controllers/workapplier/backoff_integration_test.go +++ b/pkg/controllers/workapplier/backoff_integration_test.go @@ -178,7 +178,7 @@ var _ = Describe("exponential backoff", func() { applyStrategy := &fleetv1beta1.ApplyStrategy{ WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff, } - createWorkObject(workName, memberReservedNSName2, applyStrategy, regularNSJSON) + createWorkObject(workName, memberReservedNSName2, applyStrategy, nil, regularNSJSON) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification @@ -395,7 +395,7 @@ var _ = Describe("exponential backoff", func() { WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, } - createWorkObject(workName, memberReservedNSName2, applyStrategy, regularNSJSON) + createWorkObject(workName, memberReservedNSName2, applyStrategy, nil, regularNSJSON) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index c4e4df323..a52fba102 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -505,6 +505,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu trackWorkAndManifestProcessingRequestMetrics(work) // Requeue the Work object with a delay based on the requeue rate limiter. + // + // Note (chenyu1): at this moment the work applier does not register changes on back-reported + // status as a trigger for resetting the rate limiter. requeueDelay := r.requeueRateLimiter.When(work, bundles) klog.V(2).InfoS("Requeue the Work object for re-processing", "work", workRef, "delaySeconds", requeueDelay.Seconds()) return ctrl.Result{RequeueAfter: requeueDelay}, nil diff --git a/pkg/controllers/workapplier/controller_integration_test.go b/pkg/controllers/workapplier/controller_integration_test.go index f742f8c02..1e38ac695 100644 --- a/pkg/controllers/workapplier/controller_integration_test.go +++ b/pkg/controllers/workapplier/controller_integration_test.go @@ -19,6 +19,7 @@ package workapplier import ( "crypto/rand" "encoding/base64" + "encoding/json" "fmt" "time" @@ -61,6 +62,7 @@ var ( ignoreFieldConditionLTTMsg = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Message") ignoreDriftDetailsObsTime = cmpopts.IgnoreFields(fleetv1beta1.DriftDetails{}, "ObservationTime", "FirstDriftedObservedTime") ignoreDiffDetailsObsTime = cmpopts.IgnoreFields(fleetv1beta1.DiffDetails{}, "ObservationTime", "FirstDiffedObservedTime") + ignoreBackReportedStatus = cmpopts.IgnoreFields(fleetv1beta1.ManifestCondition{}, "BackReportedStatus") lessFuncPatchDetail = func(a, b fleetv1beta1.PatchDetail) bool { return a.Path < b.Path @@ -77,7 +79,12 @@ var ( ) // createWorkObject creates a new Work object with the given name, manifests, and apply strategy. -func createWorkObject(workName, memberClusterReservedNSName string, applyStrategy *fleetv1beta1.ApplyStrategy, rawManifestJSON ...[]byte) { +func createWorkObject( + workName, memberClusterReservedNSName string, + applyStrategy *fleetv1beta1.ApplyStrategy, + reportBackStrategy *fleetv1beta1.ReportBackStrategy, + rawManifestJSON ...[]byte, +) { manifests := make([]fleetv1beta1.Manifest, len(rawManifestJSON)) for idx := range rawManifestJSON { manifests[idx] = fleetv1beta1.Manifest{ @@ -96,7 +103,8 @@ func createWorkObject(workName, memberClusterReservedNSName string, applyStrateg Workload: fleetv1beta1.WorkloadTemplate{ Manifests: manifests, }, - ApplyStrategy: applyStrategy, + ApplyStrategy: applyStrategy, + ReportBackStrategy: reportBackStrategy, }, } Expect(hubClient.Create(ctx, work)).To(Succeed()) @@ -504,6 +512,9 @@ func workStatusUpdated( work.Status, wantWorkStatus, ignoreFieldConditionLTTMsg, ignoreDiffDetailsObsTime, ignoreDriftDetailsObsTime, + // Back-reported status must be checked separately, as the serialization/deserialization process + // does not guarantee key order in objects. + ignoreBackReportedStatus, cmpopts.SortSlices(lessFuncPatchDetail), ); diff != "" { return fmt.Errorf("work status diff (-got, +want):\n%s", diff) @@ -781,7 +792,7 @@ var _ = Describe("applying manifests", func() { regularDeployJSON := marshalK8sObjJSON(regularDeploy) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -963,7 +974,7 @@ var _ = Describe("applying manifests", func() { regularDeployJSON := marshalK8sObjJSON(regularDeploy) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -1229,7 +1240,7 @@ var _ = Describe("applying manifests", func() { regularDeployJSON := marshalK8sObjJSON(regularDeploy) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -1429,7 +1440,7 @@ var _ = Describe("applying manifests", func() { regularConfigMapJSON := marshalK8sObjJSON(regularConfigMap) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, decodingErredDeployJSON, regularConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, decodingErredDeployJSON, regularConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -1617,7 +1628,7 @@ var _ = Describe("applying manifests", func() { malformedConfigMapJSON := marshalK8sObjJSON(malformedConfigMap) // Create a new Work object with all the manifest JSONs and proper apply strategy. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, malformedConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, malformedConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -1778,7 +1789,7 @@ var _ = Describe("work applier garbage collection", func() { regularDeployJSON := marshalK8sObjJSON(regularDeploy) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -2043,7 +2054,7 @@ var _ = Describe("work applier garbage collection", func() { regularClusterRoleJSON := marshalK8sObjJSON(regularClusterRole) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, regularNSJSON, regularDeployJSON, regularClusterRoleJSON) + createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, nil, regularNSJSON, regularDeployJSON, regularClusterRoleJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -2362,7 +2373,7 @@ var _ = Describe("work applier garbage collection", func() { regularClusterRoleJSON := marshalK8sObjJSON(regularClusterRole) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, regularNSJSON, regularDeployJSON, regularClusterRoleJSON) + createWorkObject(workName, memberReservedNSName1, &fleetv1beta1.ApplyStrategy{AllowCoOwnership: true}, nil, regularNSJSON, regularDeployJSON, regularClusterRoleJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -2685,7 +2696,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -2885,7 +2896,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -3150,7 +3161,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypeFullComparison, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeIfNoDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -3413,7 +3424,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -3840,7 +3851,7 @@ var _ = Describe("drift detection and takeover", func() { WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeAlways, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularJobJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularJobJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -4223,7 +4234,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypeFullComparison, WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -4463,7 +4474,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToApply: fleetv1beta1.WhenToApplyTypeAlways, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -4720,7 +4731,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, marshalK8sObjJSON(regularNS)) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, marshalK8sObjJSON(regularNS)) }) It("should add cleanup finalizer to the Work object", func() { @@ -5081,7 +5092,7 @@ var _ = Describe("drift detection and takeover", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, WhenToApply: fleetv1beta1.WhenToApplyTypeIfNotDrifted, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, marshalK8sObjJSON(regularNS)) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, marshalK8sObjJSON(regularNS)) }) It("should add cleanup finalizer to the Work object", func() { @@ -5366,7 +5377,7 @@ var _ = Describe("drift detection and takeover", func() { applyStrategy := &fleetv1beta1.ApplyStrategy{ WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, marshalK8sObjJSON(regularDeploy)) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, marshalK8sObjJSON(regularDeploy)) }) It("should add cleanup finalizer to the Work object", func() { @@ -5552,7 +5563,7 @@ var _ = Describe("drift detection and takeover", func() { WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularCMJSON, regularSecretJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularCMJSON, regularSecretJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -6126,7 +6137,7 @@ var _ = Describe("report diff", func() { applyStrategy := &fleetv1beta1.ApplyStrategy{ Type: fleetv1beta1.ApplyStrategyTypeReportDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -6245,7 +6256,7 @@ var _ = Describe("report diff", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, Type: fleetv1beta1.ApplyStrategyTypeReportDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -6573,7 +6584,7 @@ var _ = Describe("report diff", func() { Type: fleetv1beta1.ApplyStrategyTypeReportDiff, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -6782,7 +6793,7 @@ var _ = Describe("report diff", func() { applyStrategy := &fleetv1beta1.ApplyStrategy{ Type: fleetv1beta1.ApplyStrategyTypeReportDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, malformedConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, malformedConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -6935,7 +6946,7 @@ var _ = Describe("report diff", func() { Type: fleetv1beta1.ApplyStrategyTypeReportDiff, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, updatedJSONJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, updatedJSONJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -7201,7 +7212,7 @@ var _ = Describe("report diff", func() { WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularCMJSON, regularSecretJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularCMJSON, regularSecretJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -7653,7 +7664,7 @@ var _ = Describe("handling different apply strategies", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, Type: fleetv1beta1.ApplyStrategyTypeReportDiff, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -8010,7 +8021,7 @@ var _ = Describe("handling different apply strategies", func() { ComparisonOption: fleetv1beta1.ComparisonOptionTypePartialComparison, Type: fleetv1beta1.ApplyStrategyTypeServerSideApply, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -8279,7 +8290,7 @@ var _ = Describe("handling different apply strategies", func() { Type: fleetv1beta1.ApplyStrategyTypeClientSideApply, WhenToTakeOver: fleetv1beta1.WhenToTakeOverTypeNever, } - createWorkObject(workName, memberReservedNSName1, applyStrategy, regularNSJSON, regularDeployJSON) + createWorkObject(workName, memberReservedNSName1, applyStrategy, nil, regularNSJSON, regularDeployJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -8678,7 +8689,7 @@ var _ = Describe("handling different apply strategies", func() { oversizedCMJSON := marshalK8sObjJSON(oversizedCM) // Create a new Work object with all the manifest JSONs and proper apply strategy. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, oversizedCMJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, oversizedCMJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -8886,7 +8897,7 @@ var _ = Describe("negative cases", func() { malformedConfigMapJSON := marshalK8sObjJSON(malformedConfigMap) // Create a Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, malformedConfigMapJSON, regularConfigMapJson) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, malformedConfigMapJSON, regularConfigMapJson) }) It("should add cleanup finalizer to the Work object", func() { @@ -9079,7 +9090,7 @@ var _ = Describe("negative cases", func() { regularConfigMapJSON := marshalK8sObjJSON(regularConfigMap) // Create a Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -9230,7 +9241,7 @@ var _ = Describe("negative cases", func() { duplicatedConfigMap.Data[dummyLabelKey] = dummyLabelValue2 // Create a Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularConfigMapJSON, duplicatedConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularConfigMapJSON, duplicatedConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -9425,7 +9436,7 @@ var _ = Describe("negative cases", func() { duplicatedConfigMapJSON := marshalK8sObjJSON(duplicatedConfigMap) // Create a Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName1, nil, regularNSJSON, regularConfigMapJSON, malformedConfigMapJSON, configMapWithGenerateNameJSON, duplicatedConfigMapJSON) + createWorkObject(workName, memberReservedNSName1, nil, nil, regularNSJSON, regularConfigMapJSON, malformedConfigMapJSON, configMapWithGenerateNameJSON, duplicatedConfigMapJSON) }) It("should add cleanup finalizer to the Work object", func() { @@ -9631,3 +9642,378 @@ var _ = Describe("negative cases", func() { }) }) }) + +var _ = Describe("status back-reporting", func() { + deploymentKind := "Deployment" + deployStatusBackReportedActual := func(workName, nsName, deployName string, beforeTimestamp metav1.Time) func() error { + return func() error { + workObj := &fleetv1beta1.Work{} + if err := hubClient.Get(ctx, client.ObjectKey{Namespace: memberReservedNSName1, Name: workName}, workObj); err != nil { + return fmt.Errorf("failed to retrieve the Work object: %w", err) + } + + var backReportedDeployStatusWrapper []byte + var backReportedDeployStatusObservedTime metav1.Time + for idx := range workObj.Status.ManifestConditions { + manifestCond := &workObj.Status.ManifestConditions[idx] + + if manifestCond.Identifier.Kind == deploymentKind && manifestCond.Identifier.Name == deployName && manifestCond.Identifier.Namespace == nsName { + backReportedDeployStatusWrapper = manifestCond.BackReportedStatus.ObservedStatus.Raw + backReportedDeployStatusObservedTime = manifestCond.BackReportedStatus.ObservationTime + break + } + } + + if len(backReportedDeployStatusWrapper) == 0 { + return fmt.Errorf("no status back-reported for deployment") + } + if backReportedDeployStatusObservedTime.Before(&beforeTimestamp) { + return fmt.Errorf("back-reported deployment status observation time, want after %v, got %v", beforeTimestamp, backReportedDeployStatusObservedTime) + } + + deployWithBackReportedStatus := &appsv1.Deployment{} + if err := json.Unmarshal(backReportedDeployStatusWrapper, deployWithBackReportedStatus); err != nil { + return fmt.Errorf("failed to unmarshal wrapped back-reported deployment status: %w", err) + } + currentDeployWithStatus := &appsv1.Deployment{} + if err := memberClient1.Get(ctx, client.ObjectKey{Namespace: nsName, Name: deployName}, currentDeployWithStatus); err != nil { + return fmt.Errorf("failed to retrieve Deployment object from member cluster side: %w", err) + } + + if diff := cmp.Diff(deployWithBackReportedStatus.Status, currentDeployWithStatus.Status); diff != "" { + return fmt.Errorf("back-reported deployment status mismatch (-got, +want):\n%s", diff) + } + return nil + } + } + + Context("can handle both object with status and object with no status", Ordered, func() { + workName := fmt.Sprintf(workNameTemplate, utils.RandStr()) + // The environment prepared by the envtest package does not support namespace + // deletion; each test case would use a new namespace. + nsName := fmt.Sprintf(nsNameTemplate, utils.RandStr()) + + var appliedWorkOwnerRef *metav1.OwnerReference + // Note: namespaces and deployments have status subresources; config maps do not. + var regularNS *corev1.Namespace + var regularDeploy *appsv1.Deployment + var regularCM *corev1.ConfigMap + + beforeTimestamp := metav1.Now() + + BeforeAll(func() { + // Prepare a NS object. + regularNS = ns.DeepCopy() + regularNS.Name = nsName + regularNSJSON := marshalK8sObjJSON(regularNS) + + // Prepare a Deployment object. + regularDeploy = deploy.DeepCopy() + regularDeploy.Namespace = nsName + regularDeploy.Name = deployName + regularDeployJSON := marshalK8sObjJSON(regularDeploy) + + // Prepare a ConfigMap object. + regularCM = configMap.DeepCopy() + regularCM.Namespace = nsName + regularCMJSON := marshalK8sObjJSON(regularCM) + + // Create a new Work object with all the manifest JSONs. + reportBackStrategy := &fleetv1beta1.ReportBackStrategy{ + Type: fleetv1beta1.ReportBackStrategyTypeMirror, + Destination: ptr.To(fleetv1beta1.ReportBackDestinationWorkAPI), + } + createWorkObject(workName, memberReservedNSName1, nil, reportBackStrategy, regularNSJSON, regularDeployJSON, regularCMJSON) + }) + + It("should add cleanup finalizer to the Work object", func() { + finalizerAddedActual := workFinalizerAddedActual(workName) + Eventually(finalizerAddedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to add cleanup finalizer to the Work object") + }) + + It("should prepare an AppliedWork object", func() { + appliedWorkCreatedActual := appliedWorkCreatedActual(workName) + Eventually(appliedWorkCreatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to prepare an AppliedWork object") + + appliedWorkOwnerRef = prepareAppliedWorkOwnerRef(workName) + }) + + It("can mark the deployment as available", func() { + markDeploymentAsAvailable(nsName, deployName) + }) + + It("should update the Work object status", func() { + // Prepare the status information. + workConds := []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAppliedReason, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAvailableReason, + }, + } + manifestConds := []fleetv1beta1.ManifestCondition{ + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "apps", + Version: "v1", + Kind: "Deployment", + Resource: "deployments", + Name: deployName, + Namespace: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 1, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 1, + }, + }, + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 2, + Group: "", + Version: "v1", + Kind: "ConfigMap", + Resource: "configmaps", + Name: configMapName, + Namespace: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + } + + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) + Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") + }) + + It("should apply the manifests", func() { + // Ensure that the NS object has been applied as expected. + regularNSObjectAppliedActual := regularNSObjectAppliedActual(nsName, appliedWorkOwnerRef) + Eventually(regularNSObjectAppliedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to apply the namespace object") + + Expect(memberClient1.Get(ctx, client.ObjectKey{Name: nsName}, regularNS)).To(Succeed(), "Failed to retrieve the NS object") + + // Ensure that the Deployment object has been applied as expected. + regularDeploymentObjectAppliedActual := regularDeploymentObjectAppliedActual(nsName, deployName, appliedWorkOwnerRef) + Eventually(regularDeploymentObjectAppliedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to apply the deployment object") + + Expect(memberClient1.Get(ctx, client.ObjectKey{Namespace: nsName, Name: deployName}, regularDeploy)).To(Succeed(), "Failed to retrieve the Deployment object") + + // Ensure that the ConfigMap object has been applied as expected. + regularCMObjectAppliedActual := regularConfigMapObjectAppliedActual(nsName, configMapName, appliedWorkOwnerRef) + Eventually(regularCMObjectAppliedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to apply the config map object") + + Expect(memberClient1.Get(ctx, client.ObjectKey{Namespace: nsName, Name: configMapName}, regularCM)).To(Succeed(), "Failed to retrieve the ConfigMap object") + }) + + It("should back-report deployment status to the Work object", func() { + Eventually(deployStatusBackReportedActual(workName, nsName, deployName, beforeTimestamp), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to back-report deployment status to the Work object") + }) + + It("should handle objects with no status gracefully", func() { + Eventually(func() error { + workObj := &fleetv1beta1.Work{} + if err := hubClient.Get(ctx, client.ObjectKey{Namespace: memberReservedNSName1, Name: workName}, workObj); err != nil { + return fmt.Errorf("failed to retrieve the Work object: %w", err) + } + + for idx := range workObj.Status.ManifestConditions { + manifestCond := &workObj.Status.ManifestConditions[idx] + + if manifestCond.Identifier.Kind == "ConfigMap" && manifestCond.Identifier.Name == configMapName && manifestCond.Identifier.Namespace == nsName { + if manifestCond.BackReportedStatus != nil { + return fmt.Errorf("back-reported status for configMap object, want empty, got %s", string(manifestCond.BackReportedStatus.ObservedStatus.Raw)) + } + return nil + } + } + return fmt.Errorf("configMap object not found") + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to handle objects with no status gracefully") + }) + + It("can refresh deployment status", func() { + // Retrieve the Deployment object and update its replica count. + // + // Use an Eventually block to reduce flakiness. + Eventually(func() error { + deploy := &appsv1.Deployment{} + if err := memberClient1.Get(ctx, client.ObjectKey{Namespace: nsName, Name: deployName}, deploy); err != nil { + return fmt.Errorf("failed to retrieve the Deployment object: %w", err) + } + + deploy.Spec.Replicas = ptr.To(int32(10)) + if err := memberClient1.Update(ctx, deploy); err != nil { + return fmt.Errorf("failed to update the Deployment object: %w", err) + } + return nil + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to retrieve and update the Deployment object") + + // Refresh the status of the Deployment. + // + // Note that the Deployment object now becomes unavailable. + Eventually(func() error { + deploy := &appsv1.Deployment{} + if err := memberClient1.Get(ctx, client.ObjectKey{Namespace: nsName, Name: deployName}, deploy); err != nil { + return fmt.Errorf("failed to retrieve the Deployment object: %w", err) + } + + now := metav1.Now() + deploy.Status = appsv1.DeploymentStatus{ + ObservedGeneration: deploy.Generation, + Replicas: 10, + UpdatedReplicas: 2, + ReadyReplicas: 8, + AvailableReplicas: 8, + UnavailableReplicas: 2, + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + Reason: "MarkedAsUnavailable", + Message: "Deployment has been marked as unavailable", + LastUpdateTime: now, + LastTransitionTime: now, + }, + { + Type: appsv1.DeploymentProgressing, + Status: corev1.ConditionTrue, + Reason: "MarkedAsProgressing", + Message: "Deployment has been marked as progressing", + LastUpdateTime: now, + LastTransitionTime: now, + }, + }, + } + if err := memberClient1.Status().Update(ctx, deploy); err != nil { + return fmt.Errorf("failed to update the Deployment status: %w", err) + } + return nil + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to refresh the status of the Deployment") + }) + + It("should back-report refreshed deployment status to the Work object", func() { + Eventually(deployStatusBackReportedActual(workName, nsName, deployName, beforeTimestamp), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to back-report deployment status to the Work object") + }) + + It("should update the AppliedWork object status", func() { + // Prepare the status information. + appliedResourceMeta := []fleetv1beta1.AppliedResourceMeta{ + { + WorkResourceIdentifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + }, + UID: regularNS.UID, + }, + { + WorkResourceIdentifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "apps", + Version: "v1", + Kind: "Deployment", + Resource: "deployments", + Name: deployName, + Namespace: nsName, + }, + UID: regularDeploy.UID, + }, + { + WorkResourceIdentifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 2, + Group: "", + Version: "v1", + Kind: "ConfigMap", + Resource: "configmaps", + Name: configMapName, + Namespace: nsName, + }, + UID: regularCM.UID, + }, + } + + appliedWorkStatusUpdatedActual := appliedWorkStatusUpdated(workName, appliedResourceMeta) + Eventually(appliedWorkStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update appliedWork status") + }) + + AfterAll(func() { + // Delete the Work object and related resources. + deleteWorkObject(workName, memberReservedNSName1) + + // Ensure applied manifest has been removed. + regularDeployRemovedActual := regularDeployRemovedActual(nsName, deployName) + Eventually(regularDeployRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the deployment object") + + regularCMRemovedActual := regularConfigMapRemovedActual(nsName, configMapName) + Eventually(regularCMRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the configMap object") + + // Kubebuilder suggests that in a testing environment like this, to check for the existence of the AppliedWork object + // OwnerReference in the Namespace object (https://book.kubebuilder.io/reference/envtest.html#testing-considerations). + checkNSOwnerReferences(workName, nsName) + + // Ensure that the AppliedWork object has been removed. + appliedWorkRemovedActual := appliedWorkRemovedActual(workName, nsName) + Eventually(appliedWorkRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the AppliedWork object") + + workRemovedActual := workRemovedActual(workName) + Eventually(workRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Work object") + + // The environment prepared by the envtest package does not support namespace + // deletion; consequently this test suite would not attempt to verify its deletion. + }) + }) +}) diff --git a/pkg/controllers/workapplier/status.go b/pkg/controllers/workapplier/status.go index 8a9fe2667..dd8a428a1 100644 --- a/pkg/controllers/workapplier/status.go +++ b/pkg/controllers/workapplier/status.go @@ -18,10 +18,13 @@ package workapplier import ( "context" + "encoding/json" "fmt" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -94,7 +97,9 @@ func (r *Reconciler) refreshWorkStatus( } } + // Set the two flags here as they are per-work-object settings. isReportDiffModeOn := work.Spec.ApplyStrategy != nil && work.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff + isStatusBackReportingOn := work.Spec.ReportBackStrategy != nil && work.Spec.ReportBackStrategy.Type == fleetv1beta1.ReportBackStrategyTypeMirror for idx := range bundles { bundle := bundles[idx] @@ -159,9 +164,18 @@ func (r *Reconciler) refreshWorkStatus( } } - // Tally the stats. + // Tally the stats, and perform status back-reporting if applicable. if isManifestObjectApplied(bundle.applyOrReportDiffResTyp) { appliedManifestsCount++ + + if isStatusBackReportingOn { + // Back-report the status from the member cluster side, if applicable. + // + // Back-reporting is only performed when: + // a) the ReportBackStrategy is of the type Mirror; and + // b) the manifest object has been applied successfully. + backReportStatus(bundle.inMemberClusterObj, manifestCond, now, klog.KObj(work)) + } } if isAppliedObjectAvailable(bundle.availabilityResTyp) { availableAppliedObjectsCount++ @@ -693,6 +707,57 @@ func prepareRebuiltManifestCondQIdx(bundles []*manifestProcessingBundle) map[str return rebuiltManifestCondQIdx } +// backReportStatus writes the status field of an object applied on the member cluster side in +// the status of the Work object. +func backReportStatus( + inMemberClusterObj *unstructured.Unstructured, + manifestCond *fleetv1beta1.ManifestCondition, + now metav1.Time, + workRef klog.ObjectRef, +) { + if inMemberClusterObj == nil || inMemberClusterObj.Object == nil { + // Do a sanity check; normally this will never occur (as status back-reporting + // only applies to objects that have been successfully applied). + // + // Should this unexpected situation occurs, the work applier does not register + // it as an error; the object shall be ignored for the status back-reporting + // part of the reconciliation loop. + wrapperErr := fmt.Errorf("attempted to back-report status for a manifest that has not been applied yet or cannot be found on the member cluster side") + _ = controller.NewUnexpectedBehaviorError(wrapperErr) + klog.ErrorS(wrapperErr, "Failed to back-report status", "work", workRef, "resourceIdentifier", manifestCond.Identifier) + return + } + if _, ok := inMemberClusterObj.Object["status"]; !ok { + // The object from the member cluster side does not have a status subresource; this + // is not considered as an error. + klog.V(2).InfoS("cannot back-report status as the applied resource on the member cluster side does not have a status subresource", "work", workRef, "resourceIdentifier", manifestCond.Identifier) + return + } + + statusBackReportingWrapper := make(map[string]interface{}) + // The TypeMeta fields must be added in the wrapper, otherwise the client libraries would + // have trouble serializing/deserializing the wrapper object when it's written/read to/from + // the API server. + statusBackReportingWrapper["apiVersion"] = inMemberClusterObj.GetAPIVersion() + statusBackReportingWrapper["kind"] = inMemberClusterObj.GetKind() + statusBackReportingWrapper["status"] = inMemberClusterObj.Object["status"] + statusData, err := json.Marshal(statusBackReportingWrapper) + if err != nil { + // This normally should never occur. + wrappedErr := fmt.Errorf("failed to marshal wrapped back-reported status: %w", err) + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to prepare status wrapper", "work", workRef, "resourceIdentifier", manifestCond.Identifier) + return + } + + manifestCond.BackReportedStatus = &fleetv1beta1.BackReportedStatus{ + ObservedStatus: runtime.RawExtension{ + Raw: statusData, + }, + ObservationTime: now, + } +} + // trimWorkStatusDataWhenOversized trims some data from the Work object status when the object // reaches its size limit. func trimWorkStatusDataWhenOversized(work *fleetv1beta1.Work) { diff --git a/pkg/controllers/workapplier/status_test.go b/pkg/controllers/workapplier/status_test.go index 6fca9b7fe..b2f8d9ce4 100644 --- a/pkg/controllers/workapplier/status_test.go +++ b/pkg/controllers/workapplier/status_test.go @@ -18,18 +18,23 @@ package workapplier import ( "context" + "encoding/json" "fmt" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" @@ -2032,6 +2037,169 @@ func TestSetWorkDiffReportedCondition(t *testing.T) { } } +// TestBackReportStatus tests the backReportStatus method. +func TestBackReportStatus(t *testing.T) { + workRef := klog.ObjectRef{ + Name: workName, + Namespace: memberReservedNSName1, + } + now := metav1.Now() + + deployWithStatus := deploy.DeepCopy() + deployWithStatus.Status = appsv1.DeploymentStatus{ + ObservedGeneration: 2, + Replicas: 5, + UpdatedReplicas: 5, + ReadyReplicas: 5, + AvailableReplicas: 5, + UnavailableReplicas: 0, + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionTrue, + }, + }, + } + deployStatusWrapperMap := map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "status": deployWithStatus.Status, + } + deployStatusWrapperBytes, _ := json.Marshal(deployStatusWrapperMap) + + deployWithStatusBfr := deploy.DeepCopy() + deployWithStatusBfr.Status = appsv1.DeploymentStatus{ + ObservedGeneration: 1, + Replicas: 4, + UpdatedReplicas: 1, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 3, + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + }, + { + Type: appsv1.DeploymentProgressing, + Status: corev1.ConditionTrue, + }, + }, + } + deployStatusWrapperMapBfr := map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "status": deployWithStatusBfr.Status, + } + deployStatusWrapperBytesBfr, _ := json.Marshal(deployStatusWrapperMapBfr) + + testCases := []struct { + name string + manifestCond *fleetv1beta1.ManifestCondition + inMemberClusterObj *unstructured.Unstructured + // The placeholder is added here to help verify the integrity of backported + // status by unmarshalling the data into its original data structure (e.g., + // a Kubernetes Deployment). + objPlaceholder client.Object + wantManifestCond *fleetv1beta1.ManifestCondition + wantIgnored bool + }{ + { + name: "object with status", + manifestCond: &fleetv1beta1.ManifestCondition{}, + inMemberClusterObj: toUnstructured(t, deployWithStatus), + objPlaceholder: deploy.DeepCopy(), + wantManifestCond: &fleetv1beta1.ManifestCondition{ + BackReportedStatus: &fleetv1beta1.BackReportedStatus{ + ObservedStatus: runtime.RawExtension{ + Raw: deployStatusWrapperBytes, + }, + ObservationTime: now, + }, + }, + }, + { + name: "object with status, overwriting previous back-reported status", + manifestCond: &fleetv1beta1.ManifestCondition{ + BackReportedStatus: &fleetv1beta1.BackReportedStatus{ + ObservedStatus: runtime.RawExtension{ + Raw: deployStatusWrapperBytesBfr, + }, + ObservationTime: metav1.Time{ + Time: now.Add(-1 * time.Minute), + }, + }, + }, + inMemberClusterObj: toUnstructured(t, deployWithStatus), + objPlaceholder: deploy.DeepCopy(), + wantManifestCond: &fleetv1beta1.ManifestCondition{ + BackReportedStatus: &fleetv1beta1.BackReportedStatus{ + ObservedStatus: runtime.RawExtension{ + Raw: deployStatusWrapperBytes, + }, + ObservationTime: now, + }, + }, + }, + { + name: "object with no status", + manifestCond: &fleetv1beta1.ManifestCondition{}, + inMemberClusterObj: toUnstructured(t, configMap.DeepCopy()), + wantManifestCond: &fleetv1beta1.ManifestCondition{}, + wantIgnored: true, + }, + // Normally this case will never occur. + { + name: "no object found on the member cluster side", + manifestCond: &fleetv1beta1.ManifestCondition{}, + wantManifestCond: &fleetv1beta1.ManifestCondition{}, + wantIgnored: true, + }, + // Normally this case will never occur. + { + name: "object found on the member cluster side but has no data", + manifestCond: &fleetv1beta1.ManifestCondition{}, + inMemberClusterObj: &unstructured.Unstructured{}, + wantManifestCond: &fleetv1beta1.ManifestCondition{}, + wantIgnored: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + backReportStatus(tc.inMemberClusterObj, tc.manifestCond, now, workRef) + + if tc.wantIgnored { + if tc.manifestCond.BackReportedStatus != nil { + t.Fatalf("backReportStatus() reported status data unexpectedly") + return + } + return + } + + // The test spec here attempts to re-build the status instead of directly + // comparing the Raw bytes, as the JSON marshalling ops are not guaranteed + // to produce deterministic results (e.g., the order of object keys might vary + // on different unmarshalling attempts, even though the data remains the same). + backReportedStatusBytes := tc.manifestCond.BackReportedStatus.ObservedStatus.Raw + if err := json.Unmarshal(backReportedStatusBytes, tc.objPlaceholder); err != nil { + t.Fatalf("back reported data unmarshalling err: %v", err) + } + backReportedStatusUnstructured := toUnstructured(t, tc.objPlaceholder) + // The test spec here does not verify the API version and Kind info as they + // are tracked just for structural integrity reasons; the information is not + // actually in use. + if diff := cmp.Diff(backReportedStatusUnstructured.Object["status"], tc.inMemberClusterObj.Object["status"]); diff != "" { + t.Errorf("backReportStatus() manifestCond diffs (-got, +want):\n%s", diff) + } + + if !cmp.Equal(tc.manifestCond.BackReportedStatus.ObservationTime, now) { + t.Errorf("backReportStatus() observed timestamp not equal, got %v, want %v", tc.manifestCond.BackReportedStatus.ObservationTime, now) + } + }) + } +} + // TestTrimWorkStatusDataWhenOversized tests the trimWorkStatusDataWhenOversized function. func TestTrimWorkStatusDataWhenOversized(t *testing.T) { now := metav1.Now() diff --git a/pkg/controllers/workapplier/waves_integration_test.go b/pkg/controllers/workapplier/waves_integration_test.go index f9aebc46c..9dd5a83bb 100644 --- a/pkg/controllers/workapplier/waves_integration_test.go +++ b/pkg/controllers/workapplier/waves_integration_test.go @@ -86,7 +86,7 @@ var _ = Describe("parallel processing with waves", func() { regularPCJSON := marshalK8sObjJSON(regularPC) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularPCJSON) + createWorkObject(workName, memberReservedNSName3, nil, nil, regularNSJSON, regularPCJSON) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification @@ -224,7 +224,7 @@ var _ = Describe("parallel processing with waves", func() { regularCMJSON := marshalK8sObjJSON(regularCM) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularCMJSON) + createWorkObject(workName, memberReservedNSName3, nil, nil, regularNSJSON, regularCMJSON) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification @@ -368,7 +368,7 @@ var _ = Describe("parallel processing with waves", func() { regularRoleJSON := marshalK8sObjJSON(regularRole) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularRoleJSON) + createWorkObject(workName, memberReservedNSName3, nil, nil, regularNSJSON, regularRoleJSON) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification @@ -1114,7 +1114,7 @@ var _ = Describe("parallel processing with waves", func() { }) // Create a new Work object with all the manifest JSONs. - createWorkObject(workName, memberReservedNSName3, nil, allManifestJSONByteArrs...) + createWorkObject(workName, memberReservedNSName3, nil, nil, allManifestJSONByteArrs...) }) // For simplicity reasons, this test case will skip some of the regular apply op result verification