From 1f48f98c941b8d8d53e1377155c7d7d494467d94 Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:27:38 +0530 Subject: [PATCH 1/6] api: add MaintenanceSpec type and maintenance upgrade phases Add MaintenanceSpec to PeerDBClusterSpec for configuring PeerDB's native maintenance mode during upgrades. This includes: - MaintenanceSpec type with image, backoffLimit, and resources fields - Two new UpgradePhase constants: StartMaintenance and EndMaintenance - ConditionMaintenanceMode condition type for tracking maintenance state - Reason constants: MaintenanceStarting, Active, Ending, Complete, Failed - Regenerated deepcopy methods and CRD manifests Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- api/v1alpha1/peerdbcluster_types.go | 52 +++++++++++-- api/v1alpha1/zz_generated.deepcopy.go | 35 +++++++++ .../peerdb.peerdb.io_peerdbclusters.yaml | 78 +++++++++++++++++++ 3 files changed, 157 insertions(+), 8 deletions(-) diff --git a/api/v1alpha1/peerdbcluster_types.go b/api/v1alpha1/peerdbcluster_types.go index 691f20d..06439d4 100644 --- a/api/v1alpha1/peerdbcluster_types.go +++ b/api/v1alpha1/peerdbcluster_types.go @@ -41,6 +41,8 @@ const ( ConditionUpgradeInProgress = "UpgradeInProgress" // ConditionBackupSafe indicates whether it is safe to take a backup (no rolling restarts or upgrades in progress). ConditionBackupSafe = "BackupSafe" + // ConditionMaintenanceMode indicates PeerDB maintenance mode is active. + ConditionMaintenanceMode = "MaintenanceMode" ) // Reason constants for status conditions. @@ -105,6 +107,16 @@ const ( ReasonBackupSafe = "BackupSafe" // ReasonBackupUnsafe indicates the cluster is not safe for backup (upgrade or rollout in progress). ReasonBackupUnsafe = "BackupUnsafe" + // ReasonMaintenanceStarting indicates maintenance mode is being activated. + ReasonMaintenanceStarting = "MaintenanceStarting" + // ReasonMaintenanceActive indicates maintenance mode is active. + ReasonMaintenanceActive = "MaintenanceActive" + // ReasonMaintenanceEnding indicates maintenance mode is being deactivated. + ReasonMaintenanceEnding = "MaintenanceEnding" + // ReasonMaintenanceComplete indicates maintenance mode has been deactivated. + ReasonMaintenanceComplete = "MaintenanceComplete" + // ReasonMaintenanceFailed indicates a maintenance mode job failed. + ReasonMaintenanceFailed = "MaintenanceFailed" ) // Annotation constants for PeerDBCluster. @@ -344,6 +356,23 @@ type InitSpec struct { TemporalSearchAttributes *InitJobSpec `json:"temporalSearchAttributes,omitempty"` } +// MaintenanceSpec configures PeerDB maintenance mode for upgrades. +// When enabled, the operator triggers PeerDB's maintenance workflows +// to gracefully pause mirrors before upgrading and resume them after. +type MaintenanceSpec struct { + // image overrides the default flow-maintenance container image. + // +optional + Image *string `json:"image,omitempty"` + // backoffLimit is the number of retries before marking the maintenance job as failed. + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:default=4 + // +optional + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + // resources defines compute resource requirements for the maintenance job container. + // +optional + Resources *corev1.ResourceRequirements `json:"resources,omitempty"` +} + // UpgradePolicy controls whether the operator automatically performs version upgrades. // +kubebuilder:validation:Enum=Automatic;Manual type UpgradePolicy string @@ -373,14 +402,16 @@ type MaintenanceWindow struct { type UpgradePhase string const ( - UpgradePhaseComplete UpgradePhase = "Complete" - UpgradePhaseWaiting UpgradePhase = "Waiting" - UpgradePhaseBlocked UpgradePhase = "Blocked" - UpgradePhaseConfig UpgradePhase = "Config" - UpgradePhaseInitJobs UpgradePhase = "InitJobs" - UpgradePhaseFlowAPI UpgradePhase = "FlowAPI" - UpgradePhaseServer UpgradePhase = "PeerDBServer" - UpgradePhaseUI UpgradePhase = "UI" + UpgradePhaseComplete UpgradePhase = "Complete" + UpgradePhaseWaiting UpgradePhase = "Waiting" + UpgradePhaseBlocked UpgradePhase = "Blocked" + UpgradePhaseStartMaintenance UpgradePhase = "StartMaintenance" + UpgradePhaseConfig UpgradePhase = "Config" + UpgradePhaseInitJobs UpgradePhase = "InitJobs" + UpgradePhaseFlowAPI UpgradePhase = "FlowAPI" + UpgradePhaseServer UpgradePhase = "PeerDBServer" + UpgradePhaseUI UpgradePhase = "UI" + UpgradePhaseEndMaintenance UpgradePhase = "EndMaintenance" ) // UpgradeStatus tracks the progress of a version upgrade. @@ -446,6 +477,11 @@ type PeerDBClusterSpec struct { // Only used when upgradePolicy is Automatic. // +optional MaintenanceWindow *MaintenanceWindow `json:"maintenanceWindow,omitempty"` + // maintenance configures PeerDB maintenance mode for graceful upgrades. + // When configured, the operator runs maintenance workflows to pause mirrors + // before upgrading and resume them after. + // +optional + Maintenance *MaintenanceSpec `json:"maintenance,omitempty"` } // PeerDBClusterStatus defines the observed state of PeerDBCluster. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 2f4c72b..9abfbcd 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -281,6 +281,36 @@ func (in *InitSpec) DeepCopy() *InitSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MaintenanceSpec) DeepCopyInto(out *MaintenanceSpec) { + *out = *in + if in.Image != nil { + in, out := &in.Image, &out.Image + *out = new(string) + **out = **in + } + if in.BackoffLimit != nil { + in, out := &in.BackoffLimit, &out.BackoffLimit + *out = new(int32) + **out = **in + } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = new(v1.ResourceRequirements) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MaintenanceSpec. +func (in *MaintenanceSpec) DeepCopy() *MaintenanceSpec { + if in == nil { + return nil + } + out := new(MaintenanceSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MaintenanceWindow) DeepCopyInto(out *MaintenanceWindow) { *out = *in @@ -394,6 +424,11 @@ func (in *PeerDBClusterSpec) DeepCopyInto(out *PeerDBClusterSpec) { *out = new(MaintenanceWindow) (*in).DeepCopyInto(*out) } + if in.Maintenance != nil { + in, out := &in.Maintenance, &out.Maintenance + *out = new(MaintenanceSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PeerDBClusterSpec. diff --git a/config/crd/bases/peerdb.peerdb.io_peerdbclusters.yaml b/config/crd/bases/peerdb.peerdb.io_peerdbclusters.yaml index 8000ffd..ad40647 100644 --- a/config/crd/bases/peerdb.peerdb.io_peerdbclusters.yaml +++ b/config/crd/bases/peerdb.peerdb.io_peerdbclusters.yaml @@ -812,6 +812,84 @@ spec: type: object type: object type: object + maintenance: + description: |- + maintenance configures PeerDB maintenance mode for graceful upgrades. + When configured, the operator runs maintenance workflows to pause mirrors + before upgrading and resume them after. + properties: + backoffLimit: + default: 4 + description: backoffLimit is the number of retries before marking + the maintenance job as failed. + format: int32 + minimum: 0 + type: integer + image: + description: image overrides the default flow-maintenance container + image. + type: string + resources: + description: resources defines compute resource requirements for + the maintenance job container. + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This field depends on the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + type: object maintenanceWindow: description: |- maintenanceWindow defines a daily time window during which upgrades may start. From 0d5a4913925cbe2940b8f9162f5dc9702671d86b Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:27:48 +0530 Subject: [PATCH 2/6] resources: add maintenance Job builders Add BuildStartMaintenanceJob and BuildEndMaintenanceJob that create Kubernetes Jobs using the ghcr.io/peerdb-io/flow-maintenance image. The Jobs run PeerDB's maintenance entrypoint with 'start' or 'end' subcommands to trigger the StartMaintenance/EndMaintenance Temporal workflows. Jobs inherit catalog connection config via the shared ConfigMap and password secret, following the same pattern as init jobs. Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- internal/resources/maintenance_jobs.go | 96 ++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 internal/resources/maintenance_jobs.go diff --git a/internal/resources/maintenance_jobs.go b/internal/resources/maintenance_jobs.go new file mode 100644 index 0000000..ca4644c --- /dev/null +++ b/internal/resources/maintenance_jobs.go @@ -0,0 +1,96 @@ +package resources + +import ( + "fmt" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/Neurostep/peerdb-operator/api/v1alpha1" +) + +// BuildStartMaintenanceJob creates a Job that triggers PeerDB's StartMaintenance workflow. +// This pauses all running mirrors and enables maintenance mode before an upgrade. +func BuildStartMaintenanceJob(cluster *v1alpha1.PeerDBCluster) *batchv1.Job { + return buildMaintenanceJob(cluster, "start") +} + +// BuildEndMaintenanceJob creates a Job that triggers PeerDB's EndMaintenance workflow. +// This resumes previously paused mirrors and disables maintenance mode after an upgrade. +func BuildEndMaintenanceJob(cluster *v1alpha1.PeerDBCluster) *batchv1.Job { + return buildMaintenanceJob(cluster, "end") +} + +func buildMaintenanceJob(cluster *v1alpha1.PeerDBCluster, action string) *batchv1.Job { + name := fmt.Sprintf("%s-maintenance-%s-%s", cluster.Name, action, SanitizeVersion(cluster.Spec.Version)) + component := fmt.Sprintf("maintenance-%s", action) + labels := CommonLabels(cluster.Name, component) + + image := fmt.Sprintf("ghcr.io/peerdb-io/flow-maintenance:stable-%s", cluster.Spec.Version) + backoffLimit := int32Ptr(4) + + if spec := cluster.Spec.Maintenance; spec != nil { + if spec.Image != nil { + image = *spec.Image + } + if spec.BackoffLimit != nil { + backoffLimit = spec.BackoffLimit + } + } + + catalogSecret := cluster.Spec.Dependencies.Catalog.PasswordSecretRef + + container := corev1.Container{ + Name: fmt.Sprintf("maintenance-%s", action), + Image: image, + Command: []string{"/root/peer-flow", "maintenance", action}, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: fmt.Sprintf("%s-config", cluster.Name), + }, + }, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "PEERDB_CATALOG_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: catalogSecret.Name, + }, + Key: catalogSecret.Key, + }, + }, + }, + }, + } + + if spec := cluster.Spec.Maintenance; spec != nil && spec.Resources != nil { + container.Resources = *spec.Resources + } + + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: cluster.Namespace, + Labels: labels, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: backoffLimit, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + ServiceAccountName: cluster.Name, + Containers: []corev1.Container{container}, + }, + }, + }, + } +} From 6de73b4332c07651d4cc29aa64c27f83b4831f27 Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:28:00 +0530 Subject: [PATCH 3/6] controller: integrate maintenance mode into upgrade state machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Insert StartMaintenance and EndMaintenance phases into the upgrade lifecycle. When spec.maintenance is configured: Waiting → StartMaintenance → Config → InitJobs → FlowAPI → Server → UI → EndMaintenance → Complete The new upgradePhaseMaintenanceJob method handles Job creation, completion polling, and failure retry (delete + recreate) for both phases. When spec.maintenance is nil, the upgrade flow is unchanged. Key behaviors: - StartMaintenance Job pauses mirrors before any component restarts - EndMaintenance Job resumes mirrors after all components are upgraded - Failed maintenance Jobs are auto-deleted for retry with Degraded condition - MaintenanceMode condition tracks whether mirrors are paused Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- .../controller/peerdbcluster_controller.go | 149 +++++++++++++++++- 1 file changed, 145 insertions(+), 4 deletions(-) diff --git a/internal/controller/peerdbcluster_controller.go b/internal/controller/peerdbcluster_controller.go index 134ef74..5808f0a 100644 --- a/internal/controller/peerdbcluster_controller.go +++ b/internal/controller/peerdbcluster_controller.go @@ -471,6 +471,13 @@ func (r *PeerDBClusterReconciler) reconcileUpgrade(ctx context.Context, cluster case peerdbv1alpha1.UpgradePhaseBlocked: return r.upgradePhaseBlocked(ctx, cluster) + case peerdbv1alpha1.UpgradePhaseStartMaintenance: + return r.upgradePhaseMaintenanceJob(ctx, cluster, + resources.BuildStartMaintenanceJob(cluster), + "StartMaintenance", + peerdbv1alpha1.UpgradePhaseConfig, + ) + case peerdbv1alpha1.UpgradePhaseConfig: // Config/secrets are always reconciled before we get here, so advance immediately. upgrade.Phase = peerdbv1alpha1.UpgradePhaseInitJobs @@ -498,10 +505,21 @@ func (r *PeerDBClusterReconciler) reconcileUpgrade(ctx context.Context, cluster ) case peerdbv1alpha1.UpgradePhaseUI: + nextPhase := peerdbv1alpha1.UpgradePhaseComplete + if cluster.Spec.Maintenance != nil { + nextPhase = peerdbv1alpha1.UpgradePhaseEndMaintenance + } return r.upgradePhaseComponent(ctx, cluster, resources.BuildUIDeployment(cluster, configHash), resources.BuildUIService(cluster), "UI", + nextPhase, + ) + + case peerdbv1alpha1.UpgradePhaseEndMaintenance: + return r.upgradePhaseMaintenanceJob(ctx, cluster, + resources.BuildEndMaintenanceJob(cluster), + "EndMaintenance", peerdbv1alpha1.UpgradePhaseComplete, ) } @@ -554,10 +572,16 @@ func (r *PeerDBClusterReconciler) upgradePhaseWaiting(ctx context.Context, clust return ctrl.Result{RequeueAfter: 30 * time.Second}, nil } - // Proceed to config phase. - upgrade.Phase = peerdbv1alpha1.UpgradePhaseConfig - upgrade.Message = "Reconciling configuration" - log.Info("Upgrade advancing to Config phase") + // Proceed to maintenance or config phase. + if cluster.Spec.Maintenance != nil { + upgrade.Phase = peerdbv1alpha1.UpgradePhaseStartMaintenance + upgrade.Message = "Starting maintenance mode" + log.Info("Upgrade advancing to StartMaintenance phase") + } else { + upgrade.Phase = peerdbv1alpha1.UpgradePhaseConfig + upgrade.Message = "Reconciling configuration" + log.Info("Upgrade advancing to Config phase") + } return ctrl.Result{Requeue: true}, nil } @@ -719,6 +743,123 @@ func (r *PeerDBClusterReconciler) upgradePhaseComponent( return ctrl.Result{Requeue: true}, nil } +// upgradePhaseMaintenanceJob reconciles a maintenance mode Job (start or end) +// and advances to the next upgrade phase once the Job completes. +func (r *PeerDBClusterReconciler) upgradePhaseMaintenanceJob( + ctx context.Context, + cluster *peerdbv1alpha1.PeerDBCluster, + job *batchv1.Job, + phaseName string, + nextPhase peerdbv1alpha1.UpgradePhase, +) (ctrl.Result, error) { + log := logf.FromContext(ctx) + upgrade := cluster.Status.Upgrade + + if err := controllerutil.SetControllerReference(cluster, job, r.Scheme); err != nil { + return ctrl.Result{}, err + } + + // Pick the appropriate reason depending on whether we are starting or ending maintenance. + creatingReason := peerdbv1alpha1.ReasonMaintenanceStarting + if nextPhase == peerdbv1alpha1.UpgradePhaseComplete { + creatingReason = peerdbv1alpha1.ReasonMaintenanceEnding + } + + existing := &batchv1.Job{} + err := r.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, existing) + if apierrors.IsNotFound(err) { + log.Info("Creating maintenance job", "job", job.Name, "phase", phaseName) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, creatingReason, phaseName, + "Creating maintenance job %s", job.Name) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: peerdbv1alpha1.ConditionMaintenanceMode, + Status: metav1.ConditionTrue, + ObservedGeneration: cluster.Generation, + Reason: creatingReason, + Message: fmt.Sprintf("Maintenance job %s is running", phaseName), + }) + if createErr := r.Create(ctx, job); createErr != nil { + return ctrl.Result{}, createErr + } + upgrade.Message = fmt.Sprintf("Waiting for %s job to complete", phaseName) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + if err != nil { + return ctrl.Result{}, err + } + + // Check Job status. + for _, c := range existing.Status.Conditions { + if c.Type == batchv1.JobComplete && c.Status == corev1.ConditionTrue { + log.Info("Maintenance job completed", "job", job.Name, "phase", phaseName) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, peerdbv1alpha1.ReasonMaintenanceComplete, phaseName, + "Maintenance job %s completed", job.Name) + + if nextPhase == peerdbv1alpha1.UpgradePhaseComplete { + // EndMaintenance completed — clear the condition and finish upgrade. + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: peerdbv1alpha1.ConditionMaintenanceMode, + Status: metav1.ConditionFalse, + ObservedGeneration: cluster.Generation, + Reason: peerdbv1alpha1.ReasonMaintenanceComplete, + Message: "Maintenance mode ended, mirrors resumed", + }) + upgrade.Phase = peerdbv1alpha1.UpgradePhaseComplete + upgrade.Message = fmt.Sprintf("Upgrade complete: %s → %s", upgrade.FromVersion, upgrade.ToVersion) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: peerdbv1alpha1.ConditionUpgradeInProgress, + Status: metav1.ConditionFalse, + ObservedGeneration: cluster.Generation, + Reason: peerdbv1alpha1.ReasonUpgradeComplete, + Message: upgrade.Message, + }) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeNormal, peerdbv1alpha1.ReasonUpgradeComplete, "UpgradeComplete", + "Version upgrade complete: %s → %s", upgrade.FromVersion, upgrade.ToVersion) + log.Info("Upgrade complete", "from", upgrade.FromVersion, "to", upgrade.ToVersion) + return ctrl.Result{}, nil + } + + // StartMaintenance completed — mark active and advance. + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: peerdbv1alpha1.ConditionMaintenanceMode, + Status: metav1.ConditionTrue, + ObservedGeneration: cluster.Generation, + Reason: peerdbv1alpha1.ReasonMaintenanceActive, + Message: "Maintenance mode active, mirrors paused", + }) + upgrade.Phase = nextPhase + upgrade.Message = fmt.Sprintf("Advancing to %s", nextPhase) + log.Info("Upgrade advancing after maintenance job", "nextPhase", nextPhase) + return ctrl.Result{Requeue: true}, nil + } + + if c.Type == batchv1.JobFailed && c.Status == corev1.ConditionTrue { + log.Info("Maintenance job failed, retrying", "job", existing.Name, "phase", phaseName) + r.Recorder.Eventf(cluster, nil, corev1.EventTypeWarning, peerdbv1alpha1.ReasonMaintenanceFailed, phaseName, + "Maintenance job %s failed, deleting for retry", existing.Name) + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: peerdbv1alpha1.ConditionDegraded, + Status: metav1.ConditionTrue, + ObservedGeneration: cluster.Generation, + Reason: peerdbv1alpha1.ReasonMaintenanceFailed, + Message: fmt.Sprintf("Maintenance job %s failed", phaseName), + }) + propagation := metav1.DeletePropagationBackground + if delErr := r.Delete(ctx, existing, &client.DeleteOptions{ + PropagationPolicy: &propagation, + }); delErr != nil && !apierrors.IsNotFound(delErr) { + return ctrl.Result{}, delErr + } + upgrade.Message = fmt.Sprintf("Maintenance job %s failed, retrying", phaseName) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + } + + // Job still running. + upgrade.Message = fmt.Sprintf("Waiting for %s job to complete", phaseName) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil +} + func (r *PeerDBClusterReconciler) isJobFailed(ctx context.Context, namespace, name string) (bool, error) { job := &batchv1.Job{} if err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, job); err != nil { From 13165f4f12c6e037dfeeb77b0bb9e6bdf482ecb3 Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:28:08 +0530 Subject: [PATCH 4/6] test: add unit tests for maintenance mode upgrade phases Add four test cases covering the maintenance mode upgrade lifecycle: - Job creation during upgrade when maintenance is configured - Advancing past StartMaintenance when job completes successfully - Skipping maintenance phases when spec.maintenance is not set - Failed maintenance job deletion, retry, and Degraded condition Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- .../peerdbcluster_controller_test.go | 498 ++++++++++++++++++ 1 file changed, 498 insertions(+) diff --git a/internal/controller/peerdbcluster_controller_test.go b/internal/controller/peerdbcluster_controller_test.go index a27ea03..ba5438a 100644 --- a/internal/controller/peerdbcluster_controller_test.go +++ b/internal/controller/peerdbcluster_controller_test.go @@ -456,4 +456,502 @@ var _ = Describe("PeerDBCluster Controller", func() { Expect(readyCond.Reason).To(Equal("Paused")) }) }) + + Context("When reconciling a cluster with maintenance mode during upgrade", func() { + const resourceName = "test-resource-maintenance" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + + BeforeEach(func() { + By("creating the catalog password secret") + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "catalog-password-maintenance", + Namespace: "default", + }, + StringData: map[string]string{ + "password": "test-password", + }, + } + err := k8sClient.Create(ctx, secret) + if err != nil && !apierrors.IsAlreadyExists(err) { + Expect(err).NotTo(HaveOccurred()) + } + + By("creating a PeerDBCluster with maintenance mode") + resource := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + if err != nil && apierrors.IsNotFound(err) { + resource = &peerdbv1alpha1.PeerDBCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + Spec: peerdbv1alpha1.PeerDBClusterSpec{ + Version: "v0.36.7", + Maintenance: &peerdbv1alpha1.MaintenanceSpec{}, + Dependencies: peerdbv1alpha1.DependenciesSpec{ + Catalog: peerdbv1alpha1.CatalogSpec{ + Host: "catalog.example.com", + Database: "peerdb", + User: "peerdb", + PasswordSecretRef: peerdbv1alpha1.SecretKeySelector{ + Name: "catalog-password-maintenance", + Key: "password", + }, + }, + Temporal: peerdbv1alpha1.TemporalSpec{ + Address: "temporal.example.com:7233", + Namespace: "peerdb", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + + AfterEach(func() { + resource := &peerdbv1alpha1.PeerDBCluster{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the maintenance PeerDBCluster") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + + It("should enter StartMaintenance phase and create maintenance job during upgrade", func() { + controllerReconciler := &PeerDBClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(10), + } + + By("running initial reconciliation to create resources at v0.36.7") + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("updating version to trigger an upgrade") + cluster := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Spec.Version = "v0.36.8" + Expect(k8sClient.Update(ctx, cluster)).To(Succeed()) + + By("reconciling to detect version change — should enter Waiting phase") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("reconciling again — should advance to StartMaintenance and create the job") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("verifying the start maintenance job was created") + job := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: resourceName + "-maintenance-start-v0-36-8", + Namespace: "default", + }, job) + Expect(err).NotTo(HaveOccurred()) + Expect(job.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(job.Spec.Template.Spec.Containers[0].Command).To(ContainElements("/root/peer-flow", "maintenance", "start")) + + By("verifying the upgrade status shows StartMaintenance phase") + cluster = &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.Upgrade).NotTo(BeNil()) + Expect(cluster.Status.Upgrade.Phase).To(Equal(peerdbv1alpha1.UpgradePhaseStartMaintenance)) + + By("verifying MaintenanceMode condition is set") + maintCond := meta.FindStatusCondition(cluster.Status.Conditions, peerdbv1alpha1.ConditionMaintenanceMode) + Expect(maintCond).NotTo(BeNil()) + Expect(maintCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(maintCond.Reason).To(Equal(peerdbv1alpha1.ReasonMaintenanceStarting)) + }) + + It("should advance past StartMaintenance when job completes", func() { + controllerReconciler := &PeerDBClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(10), + } + + By("running initial reconciliation") + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("triggering upgrade to v0.36.8") + cluster := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Spec.Version = "v0.36.8" + Expect(k8sClient.Update(ctx, cluster)).To(Succeed()) + + By("reconciling through Waiting → StartMaintenance") + for i := 0; i < 3; i++ { + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + } + + By("marking the start maintenance job as complete") + job := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: resourceName + "-maintenance-start-v0-36-8", + Namespace: "default", + }, job) + Expect(err).NotTo(HaveOccurred()) + now := metav1.Now() + job.Status.StartTime = &now + job.Status.CompletionTime = &now + job.Status.Conditions = append(job.Status.Conditions, + batchv1.JobCondition{ + Type: batchv1.JobSuccessCriteriaMet, + Status: corev1.ConditionTrue, + }, + batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }, + ) + Expect(k8sClient.Status().Update(ctx, job)).To(Succeed()) + + By("reconciling — should advance past StartMaintenance to Config") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("verifying upgrade phase advanced past StartMaintenance") + cluster = &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.Upgrade).NotTo(BeNil()) + // After StartMaintenance completes, it should advance to Config (then immediately to InitJobs) + phase := cluster.Status.Upgrade.Phase + Expect(phase).To(BeElementOf( + peerdbv1alpha1.UpgradePhaseConfig, + peerdbv1alpha1.UpgradePhaseInitJobs, + peerdbv1alpha1.UpgradePhaseFlowAPI, + )) + + By("verifying MaintenanceMode condition shows Active") + maintCond := meta.FindStatusCondition(cluster.Status.Conditions, peerdbv1alpha1.ConditionMaintenanceMode) + Expect(maintCond).NotTo(BeNil()) + Expect(maintCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(maintCond.Reason).To(Equal(peerdbv1alpha1.ReasonMaintenanceActive)) + }) + + It("should create EndMaintenance job with MaintenanceEnding reason after all components upgrade", func() { + controllerReconciler := &PeerDBClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(10), + } + + By("running initial reconciliation") + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("triggering upgrade to v0.36.8") + cluster := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Spec.Version = "v0.36.8" + Expect(k8sClient.Update(ctx, cluster)).To(Succeed()) + + By("reconciling through Waiting → StartMaintenance → job creation") + for i := 0; i < 3; i++ { + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + } + + By("completing the start maintenance job") + startJob := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: resourceName + "-maintenance-start-v0-36-8", + Namespace: "default", + }, startJob) + Expect(err).NotTo(HaveOccurred()) + now := metav1.Now() + startJob.Status.StartTime = &now + startJob.Status.CompletionTime = &now + startJob.Status.Conditions = append(startJob.Status.Conditions, + batchv1.JobCondition{Type: batchv1.JobSuccessCriteriaMet, Status: corev1.ConditionTrue}, + batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + ) + Expect(k8sClient.Status().Update(ctx, startJob)).To(Succeed()) + + By("reconciling through Config → InitJobs → FlowAPI → Server → UI → EndMaintenance") + // Helper: simulate deployment rollout (envtest has no real pods). + simulateDeploymentRollout := func(name string) { + dep := &appsv1.Deployment{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: "default"}, dep); err != nil { + return + } + replicas := int32(1) + if dep.Spec.Replicas != nil { + replicas = *dep.Spec.Replicas + } + dep.Status.ObservedGeneration = dep.Generation + dep.Status.Replicas = replicas + dep.Status.UpdatedReplicas = replicas + dep.Status.AvailableReplicas = replicas + dep.Status.ReadyReplicas = replicas + _ = k8sClient.Status().Update(ctx, dep) + } + // Helper: mark init job as complete. + completeJob := func(name string) { + j := &batchv1.Job{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: "default"}, j); err != nil { + return + } + if len(j.Status.Conditions) > 0 { + return // already has conditions + } + t := metav1.Now() + j.Status.StartTime = &t + j.Status.CompletionTime = &t + j.Status.Conditions = []batchv1.JobCondition{ + {Type: batchv1.JobSuccessCriteriaMet, Status: corev1.ConditionTrue}, + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + _ = k8sClient.Status().Update(ctx, j) + } + + for i := 0; i < 20; i++ { + _, _ = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + completeJob(resourceName + "-temporal-ns-register-v0-36-8") + completeJob(resourceName + "-temporal-search-attr-v0-36-8") + simulateDeploymentRollout(resourceName + "-flow-api") + simulateDeploymentRollout(resourceName + "-server") + simulateDeploymentRollout(resourceName + "-ui") + } + + By("verifying the end maintenance job was created") + endJob := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: resourceName + "-maintenance-end-v0-36-8", + Namespace: "default", + }, endJob) + Expect(err).NotTo(HaveOccurred()) + Expect(endJob.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(endJob.Spec.Template.Spec.Containers[0].Command).To(ContainElements("/root/peer-flow", "maintenance", "end")) + + By("verifying MaintenanceMode condition shows MaintenanceEnding reason") + cluster = &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + maintCond := meta.FindStatusCondition(cluster.Status.Conditions, peerdbv1alpha1.ConditionMaintenanceMode) + Expect(maintCond).NotTo(BeNil()) + Expect(maintCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(maintCond.Reason).To(Equal(peerdbv1alpha1.ReasonMaintenanceEnding)) + + By("completing the end maintenance job") + endJob.Status.StartTime = &now + endJob.Status.CompletionTime = &now + endJob.Status.Conditions = append(endJob.Status.Conditions, + batchv1.JobCondition{Type: batchv1.JobSuccessCriteriaMet, Status: corev1.ConditionTrue}, + batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + ) + Expect(k8sClient.Status().Update(ctx, endJob)).To(Succeed()) + + By("reconciling — should complete the upgrade") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("verifying upgrade is complete") + cluster = &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(cluster.Status.Upgrade).NotTo(BeNil()) + Expect(cluster.Status.Upgrade.Phase).To(Equal(peerdbv1alpha1.UpgradePhaseComplete)) + + By("verifying MaintenanceMode condition is False with MaintenanceComplete reason") + maintCond = meta.FindStatusCondition(cluster.Status.Conditions, peerdbv1alpha1.ConditionMaintenanceMode) + Expect(maintCond).NotTo(BeNil()) + Expect(maintCond.Status).To(Equal(metav1.ConditionFalse)) + Expect(maintCond.Reason).To(Equal(peerdbv1alpha1.ReasonMaintenanceComplete)) + }) + + It("should not enter maintenance phases when maintenance is not configured", func() { + By("creating a cluster without maintenance") + noMaintName := "test-no-maint" + noMaintNN := types.NamespacedName{Name: noMaintName, Namespace: "default"} + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "catalog-password-no-maint", + Namespace: "default", + }, + StringData: map[string]string{ + "password": "test-password", + }, + } + err := k8sClient.Create(ctx, secret) + if err != nil && !apierrors.IsAlreadyExists(err) { + Expect(err).NotTo(HaveOccurred()) + } + + resource := &peerdbv1alpha1.PeerDBCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: noMaintName, + Namespace: "default", + }, + Spec: peerdbv1alpha1.PeerDBClusterSpec{ + Version: "v0.36.7", + Dependencies: peerdbv1alpha1.DependenciesSpec{ + Catalog: peerdbv1alpha1.CatalogSpec{ + Host: "catalog.example.com", + Database: "peerdb", + User: "peerdb", + PasswordSecretRef: peerdbv1alpha1.SecretKeySelector{ + Name: "catalog-password-no-maint", + Key: "password", + }, + }, + Temporal: peerdbv1alpha1.TemporalSpec{ + Address: "temporal.example.com:7233", + Namespace: "peerdb", + }, + }, + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + + controllerReconciler := &PeerDBClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(10), + } + + By("running initial reconciliation") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: noMaintNN, + }) + Expect(err).NotTo(HaveOccurred()) + + By("triggering upgrade") + cluster := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, noMaintNN, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Spec.Version = "v0.36.8" + Expect(k8sClient.Update(ctx, cluster)).To(Succeed()) + + By("reconciling through the upgrade") + for i := 0; i < 3; i++ { + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: noMaintNN, + }) + Expect(err).NotTo(HaveOccurred()) + } + + By("verifying no maintenance job was created") + job := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: noMaintName + "-maintenance-start-v0-36-8", + Namespace: "default", + }, job) + Expect(apierrors.IsNotFound(err)).To(BeTrue(), "maintenance job should not exist without spec.maintenance") + + By("cleaning up") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + + It("should delete and retry failed maintenance jobs", func() { + controllerReconciler := &PeerDBClusterReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: events.NewFakeRecorder(10), + } + + By("running initial reconciliation") + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("triggering upgrade to v0.36.9") + cluster := &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + cluster.Spec.Version = "v0.36.9" + Expect(k8sClient.Update(ctx, cluster)).To(Succeed()) + + By("reconciling to create the maintenance job") + for i := 0; i < 3; i++ { + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + } + + By("marking the maintenance job as failed") + jobName := resourceName + "-maintenance-start-v0-36-9" + job := &batchv1.Job{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: jobName, + Namespace: "default", + }, job) + Expect(err).NotTo(HaveOccurred()) + + now := metav1.Now() + job.Status.StartTime = &now + job.Status.Conditions = append(job.Status.Conditions, + batchv1.JobCondition{ + Type: batchv1.JobFailureTarget, + Status: corev1.ConditionTrue, + }, + batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + }, + ) + Expect(k8sClient.Status().Update(ctx, job)).To(Succeed()) + + By("reconciling — should delete the failed job") + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: jobName, + Namespace: "default", + }, &batchv1.Job{}) + Expect(apierrors.IsNotFound(err)).To(BeTrue(), "expected failed maintenance job to be deleted") + + By("verifying Degraded condition is set") + cluster = &peerdbv1alpha1.PeerDBCluster{} + err = k8sClient.Get(ctx, typeNamespacedName, cluster) + Expect(err).NotTo(HaveOccurred()) + degradedCond := meta.FindStatusCondition(cluster.Status.Conditions, peerdbv1alpha1.ConditionDegraded) + Expect(degradedCond).NotTo(BeNil()) + Expect(degradedCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(degradedCond.Reason).To(Equal(peerdbv1alpha1.ReasonMaintenanceFailed)) + }) + }) }) From a0f019aa8020d47929710f0a310b45471823a47a Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:28:17 +0530 Subject: [PATCH 5/6] test(e2e): add e2e test for maintenance mode upgrade Add an e2e test that verifies the maintenance mode integration in a real cluster: - Creates a PeerDBCluster with spec.maintenance configured - Patches the version to trigger an upgrade - Verifies the start maintenance Job is created with correct command - Verifies the upgrade status shows StartMaintenance phase - Verifies ownerReferences point to PeerDBCluster for GC Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- test/e2e/e2e_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 20ad820..fbedd8d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -563,6 +563,97 @@ spec: }).Should(Succeed()) }) + It("should create maintenance jobs during upgrade when maintenance is configured", func() { + By("creating a PeerDBCluster with maintenance mode") + maintClusterYAML := `apiVersion: peerdb.peerdb.io/v1alpha1 +kind: PeerDBCluster +metadata: + name: e2e-maint-cluster + namespace: ` + testNs + ` +spec: + version: "v0.36.7" + maintenance: {} + dependencies: + catalog: + host: "catalog.example.com" + port: 5432 + database: "peerdb" + user: "peerdb" + passwordSecretRef: + name: e2e-catalog-password + key: password + sslMode: "disable" + temporal: + address: "temporal.example.com:7233" + namespace: "default" + init: + temporalNamespaceRegistration: + enabled: false + temporalSearchAttributes: + enabled: false` + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(maintClusterYAML) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for the cluster to be reconciled") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "deployment", "e2e-maint-cluster-flow-api", + "-n", testNs, "--no-headers") + _, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + }).Should(Succeed()) + + By("triggering an upgrade by patching the version") + cmd = exec.Command("kubectl", "patch", "peerdbcluster", "e2e-maint-cluster", + "-n", testNs, "--type", "merge", + "-p", `{"spec":{"version":"v0.36.8"}}`) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying the start maintenance job is created") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "job", + "e2e-maint-cluster-maintenance-start-v0-36-8", + "-n", testNs, "--no-headers") + _, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + }).Should(Succeed()) + + By("verifying the upgrade status shows StartMaintenance phase") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "peerdbcluster", "e2e-maint-cluster", + "-n", testNs, "-o", "jsonpath={.status.upgrade.phase}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal("StartMaintenance")) + }).Should(Succeed()) + + By("verifying the maintenance job has correct container command") + cmd = exec.Command("kubectl", "get", "job", + "e2e-maint-cluster-maintenance-start-v0-36-8", + "-n", testNs, + "-o", "jsonpath={.spec.template.spec.containers[0].command}") + output, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + Expect(output).To(ContainSubstring("maintenance")) + Expect(output).To(ContainSubstring("start")) + + By("verifying the maintenance job has ownerReference to PeerDBCluster") + cmd = exec.Command("kubectl", "get", "job", + "e2e-maint-cluster-maintenance-start-v0-36-8", + "-n", testNs, + "-o", "jsonpath={.metadata.ownerReferences[0].kind}") + output, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + Expect(output).To(Equal("PeerDBCluster")) + + By("cleaning up the maintenance cluster") + cmd = exec.Command("kubectl", "delete", "peerdbcluster", "e2e-maint-cluster", + "-n", testNs, "--ignore-not-found") + _, _ = utils.Run(cmd) + }) + It("should clean up owned resources when PeerDBCluster is deleted", func() { By("deleting the PeerDBWorkerPool") cmd := exec.Command("kubectl", "delete", "peerdbworkerpool", "e2e-workers", From bbedb6042fb4a5d221d59fa6695ba9e0dbaba8dd Mon Sep 17 00:00:00 2001 From: Maksim Terekhin Date: Sun, 8 Mar 2026 11:28:28 +0530 Subject: [PATCH 6/6] docs: document maintenance mode feature Update documentation across all relevant files: - README: add Maintenance Mode Integration to features list - API reference: add MaintenanceSpec type, MaintenanceMode condition, StartMaintenance/EndMaintenance upgrade phases - Architecture: add Maintenance Jobs to diagram and reconciliation strategy, add maintenance_jobs.go to project structure - Safe upgrade runbook: add Maintenance Mode section with YAML examples, update upgrade order and phases table Amp-Thread-ID: https://ampcode.com/threads/T-019ccbea-b6d3-7583-8ac6-4f8a88c21dbd Co-authored-by: Amp --- README.md | 1 + docs/api-reference/v1alpha1.md | 16 +++++++++++- docs/architecture.md | 13 +++++++--- docs/runbooks/safe-upgrade.md | 47 +++++++++++++++++++++++++++++++++- 4 files changed, 71 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index c8e6f25..f0b107a 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ It provides declarative cluster management through custom resources, enabling us - **Multiple Worker Pools**: Different sizing, node selectors, and tolerations per workload profile - **Scale-to-Zero**: Snapshot workers can scale to zero when no initial loads are running - **Automatic Lifecycle Management**: OwnerReferences enable automatic garbage collection on CR deletion +- **Maintenance Mode Integration**: Gracefully pauses mirrors before upgrades and resumes them after via PeerDB's maintenance workflows ## Getting Started diff --git a/docs/api-reference/v1alpha1.md b/docs/api-reference/v1alpha1.md index 49af211..5a77a87 100644 --- a/docs/api-reference/v1alpha1.md +++ b/docs/api-reference/v1alpha1.md @@ -32,6 +32,7 @@ This document describes all Custom Resource Definitions (CRDs) managed by the Pe | `paused` | `bool` | No | `false` | When true, the operator stops reconciling this cluster. | | `upgradePolicy` | [`UpgradePolicy`](#upgradepolicy) | No | `Automatic` | Controls how version upgrades are applied. Enum: `Automatic`, `Manual`. | | `maintenanceWindow` | [`MaintenanceWindow`](#maintenancewindow) | No | — | Time window for automatic upgrades. Only used when `upgradePolicy` is `Automatic`. | +| `maintenance` | [`MaintenanceSpec`](#maintenancespec) | No | — | Configures PeerDB maintenance mode for graceful upgrades. When set, the operator pauses mirrors before upgrading and resumes them after. | ### PeerDBClusterStatus @@ -205,6 +206,16 @@ Defines a time window during which automatic upgrades may be applied. | `end` | `string` | **Yes** | — | End time in 24-hour `HH:MM` format. | | `timeZone` | `*string` | No | `UTC` | IANA timezone name (e.g., `America/New_York`). | +### MaintenanceSpec + +Configuration for PeerDB maintenance mode during upgrades. When configured, the operator runs maintenance Jobs (`ghcr.io/peerdb-io/flow-maintenance`) to gracefully pause all mirrors before upgrading and resume them after. + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `image` | `*string` | No | `ghcr.io/peerdb-io/flow-maintenance:stable-{version}` | Container image override for the maintenance Job. | +| `backoffLimit` | `*int32` | No | `4` | Number of retries before marking the maintenance Job as failed (min: 0). | +| `resources` | `*ResourceRequirements` | No | — | CPU/memory resource requests and limits for the maintenance Job container. | + ### UpgradePolicy `string` enum controlling how version upgrades are applied. @@ -232,7 +243,7 @@ Tracks the state of a rolling version upgrade. |-------|------|-------------| | `fromVersion` | `string` | The version being upgraded from. | | `toVersion` | `string` | The version being upgraded to. | -| `phase` | `UpgradePhase` | Current upgrade phase. Values: `Complete`, `Waiting`, `Blocked`, `Config`, `InitJobs`, `FlowAPI`, `PeerDBServer`, `UI`. | +| `phase` | `UpgradePhase` | Current upgrade phase. Values: `Complete`, `Waiting`, `Blocked`, `StartMaintenance`, `Config`, `InitJobs`, `FlowAPI`, `PeerDBServer`, `UI`, `EndMaintenance`. | | `startedAt` | `*metav1.Time` | Timestamp when the upgrade started. | | `message` | `string` | Human-readable message about the upgrade state. | @@ -361,6 +372,7 @@ The following condition types are used in `PeerDBCluster` status: | `Degraded` | Set to `True` when one or more components are unhealthy but the cluster is partially operational. | | `UpgradeInProgress` | Set to `True` when a version upgrade is in progress. | | `BackupSafe` | Whether it is safe to take a backup. `True` when no upgrade or rolling restart is in progress. `False` with reason `BackupInProgress` when the `peerdb.io/backup-in-progress` annotation is set, or `BackupUnsafe` when an upgrade/rollout is active. | +| `MaintenanceMode` | Set to `True` when PeerDB maintenance mode is active (mirrors are paused for an upgrade). Set to `False` with reason `MaintenanceComplete` after mirrors are resumed. | ### Annotations @@ -383,9 +395,11 @@ The `UpgradeStatus.phase` field tracks progress through a rolling upgrade: |-------|-------------| | `Waiting` | Upgrade is pending (e.g., waiting for a maintenance window). | | `Blocked` | Upgrade is blocked (e.g., manual policy requires acknowledgement). | +| `StartMaintenance` | Running the StartMaintenance Job to pause mirrors before upgrade. | | `Config` | Updating shared ConfigMap and configuration. | | `InitJobs` | Re-running init jobs if needed. | | `FlowAPI` | Rolling out the Flow API Deployment. | | `PeerDBServer` | Rolling out the PeerDB Server Deployment. | | `UI` | Rolling out the PeerDB UI Deployment. | +| `EndMaintenance` | Running the EndMaintenance Job to resume mirrors after upgrade. | | `Complete` | Upgrade finished successfully. | diff --git a/docs/architecture.md b/docs/architecture.md index 4af1704..29b93aa 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -37,6 +37,7 @@ flowchart TB UISvc["PeerDB UI\nService :3000"] NSJob["Temporal NS\nRegister Job"] SAJob["Search Attr\nJob"] + MaintJob["Maintenance\nJobs"] end subgraph ManagedByWorker["Owned by PeerDBWorkerPool"] @@ -58,6 +59,7 @@ flowchart TB CC --> UISvc CC --> NSJob CC --> SAJob + CC --> MaintJob WC -->|"reads cluster config"| PeerDBCluster WC --> WorkerDep @@ -121,9 +123,11 @@ A single CRD would force all scaling decisions through one reconciler and one sp 1. **Dependency validation** — Check catalog password Secret exists before proceeding 2. **Shared infrastructure** — ServiceAccount → ConfigMap (connection config) -3. **Init jobs** — Idempotent Temporal setup jobs; cluster waits for completion -4. **Components** — Flow API → PeerDB Server → UI (Deployments + Services) -5. **Status rollup** — Individual conditions aggregate into overall `Ready` condition +3. **Maintenance mode** — If `spec.maintenance` is set, run StartMaintenance Job to pause mirrors (upgrade only) +4. **Init jobs** — Idempotent Temporal setup jobs; cluster waits for completion +5. **Components** — Flow API → PeerDB Server → UI (Deployments + Services) +6. **End maintenance** — If `spec.maintenance` is set, run EndMaintenance Job to resume mirrors (upgrade only) +7. **Status rollup** — Individual conditions aggregate into overall `Ready` condition All managed resources have **OwnerReferences** set to the parent CR, enabling automatic garbage collection on deletion without custom finalizers. @@ -154,7 +158,8 @@ internal/ ├── ui.go # PeerDB UI Deployment + Service ├── flow_worker.go # Flow Worker Deployment ├── snapshot_worker.go # Snapshot Worker StatefulSet + headless Service - └── init_jobs.go # Temporal init Jobs + ├── init_jobs.go # Temporal init Jobs + └── maintenance_jobs.go # Maintenance mode Jobs config/ ├── crd/bases/ # Generated CRD manifests diff --git a/docs/runbooks/safe-upgrade.md b/docs/runbooks/safe-upgrade.md index 3f1b6c1..8193447 100644 --- a/docs/runbooks/safe-upgrade.md +++ b/docs/runbooks/safe-upgrade.md @@ -70,10 +70,11 @@ For more control, use the manual upgrade policy: The controller enforces a specific rollout order to minimize disruption: ``` -ConfigMap/Secrets → Init Jobs → Flow API → PeerDB Server → UI +[StartMaintenance →] ConfigMap/Secrets → Init Jobs → Flow API → PeerDB Server → UI [→ EndMaintenance] ``` Each step must complete successfully before the next begins. This ensures: +- Mirrors are gracefully paused before any component restarts (when `spec.maintenance` is configured). - Configuration is propagated before any component restarts. - The Flow API (gRPC backend) is ready before the Server and UI that depend on it. - The UI is upgraded last since it's the least critical component. @@ -102,6 +103,48 @@ spec: - Remove or omit `maintenanceWindow` to allow upgrades at any time. - If `timeZone` is not specified, it defaults to UTC. +## Maintenance Mode + +PeerDB has a built-in maintenance mode that gracefully pauses all running mirrors before an upgrade and resumes them after. The operator integrates this via Kubernetes Jobs: + +```yaml +apiVersion: peerdb.peerdb.io/v1alpha1 +kind: PeerDBCluster +metadata: + name: peerdb +spec: + version: "v0.37.0" + maintenance: {} + # ... rest of spec +``` + +When `spec.maintenance` is set, the upgrade flow becomes: + +1. **StartMaintenance** — A Job runs using the `flow-maintenance` image with `start` command. This triggers PeerDB's `StartMaintenance` Temporal workflow, which waits for running snapshots, enables maintenance mode (`PEERDB_MAINTENANCE_MODE_ENABLED`), and pauses all running mirrors. +2. **Normal upgrade** — Config, init jobs, Flow API, Server, and UI are rolled out in order. +3. **EndMaintenance** — A Job runs with the `end` command, resuming all previously paused mirrors and disabling maintenance mode. + +While maintenance mode is active, mirrors cannot be created or mutated through PeerDB. + +### Customizing the Maintenance Job + +```yaml +spec: + maintenance: + image: "custom-registry/flow-maintenance:v1.0.0" # Override image + backoffLimit: 6 # Retry count + resources: + requests: + cpu: "100m" + memory: "128Mi" +``` + +If a maintenance Job fails, the operator deletes it and retries automatically. A `Degraded` condition is set so you can monitor failures via: + +```bash +kubectl get peerdbcluster -o jsonpath='{.status.conditions}' | jq '.[] | select(.type=="MaintenanceMode")' +``` + ## Monitoring Upgrade Progress ### Quick Status @@ -140,8 +183,10 @@ Example output: | `FlowAPI` | Rolling out Flow API Deployment | | `PeerDBServer` | Rolling out PeerDB Server Deployment | | `UI` | Rolling out UI Deployment | +| `EndMaintenance` | Running EndMaintenance Job (resuming mirrors) | | `Complete` | Upgrade finished successfully | | `Blocked` | Upgrade blocked — dependencies are unhealthy | +| `StartMaintenance` | Running StartMaintenance Job (pausing mirrors) | ### Watch Upgrade Events