Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 100 additions & 59 deletions controllers/classifier_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,46 @@ func (r *ClassifierReconciler) deployClassifier(ctx context.Context, classifierS
logger.V(logs.LogDebug).Info("request to deploy")

var errorSeen error
allDeployed := true
clusterInfo := make([]libsveltosv1beta1.ClusterInfo, 0)
allProcessed := true

for i := range classifier.Status.ClusterInfo {
c := classifier.Status.ClusterInfo[i]
cInfo, err := r.processClassifier(ctx, classifierScope, r.ControlPlaneEndpoint, &c.Cluster, f, logger)
c := &classifier.Status.ClusterInfo[i]

l := logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s",
c.Cluster.Kind, c.Cluster.Namespace, c.Cluster.Name))

clusterInfo, err := r.processClassifier(ctx, classifierScope, r.ControlPlaneEndpoint, &c.Cluster, f, l)
if err != nil {
errorSeen = err
}
if cInfo != nil {
clusterInfo = append(clusterInfo, *cInfo)
if cInfo.Status != libsveltosv1beta1.SveltosStatusProvisioned {
allDeployed = false
if clusterInfo != nil {
classifier.Status.ClusterInfo[i] = *clusterInfo
if clusterInfo.Status != libsveltosv1beta1.SveltosStatusProvisioned {
allProcessed = false
}
}
}

// Filter out entries with Statuslibsveltosv1beta1.SveltosStatusRemoved
n := 0
for i := range classifier.Status.ClusterInfo {
if classifier.Status.ClusterInfo[i].Status != libsveltosv1beta1.SveltosStatusRemoved {
classifier.Status.ClusterInfo[n] = classifier.Status.ClusterInfo[i]
n++
}
}
// Truncate the slice to the new length
classifier.Status.ClusterInfo = classifier.Status.ClusterInfo[:n]

// Update Classifier Status
classifierScope.SetClusterInfo(clusterInfo)
logger.V(logs.LogDebug).Info("set clusterInfo")
classifierScope.SetClusterInfo(classifier.Status.ClusterInfo)

if errorSeen != nil {
return errorSeen
}

if !allDeployed {
if !allProcessed {
return fmt.Errorf("request to deploy Classifier is still queued in one ore more clusters")
}

Expand Down Expand Up @@ -948,23 +964,35 @@ func (r *ClassifierReconciler) processClassifier(ctx context.Context, classifier
cpEndpoint string, cluster *corev1.ObjectReference, f feature, logger logr.Logger,
) (*libsveltosv1beta1.ClusterInfo, error) {

logger = logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s", cluster.Kind, cluster.Namespace, cluster.Name))

// Get Classifier Spec hash (at this very precise moment)
currentHash, err := r.getCurrentHash(ctx, classifierScope, cpEndpoint, cluster, f, logger)
if err != nil {
return nil, err
}
classifier := classifierScope.Classifier

var proceed bool
proceed, err = r.canProceed(ctx, classifierScope, cluster, logger)
clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Hash: currentHash,
FailureMessage: nil,
Status: libsveltosv1beta1.SveltosStatusProvisioning,
}

proceed, err := r.canProceed(ctx, classifierScope, cluster, logger)
if err != nil {
return nil, err
failureMessage := err.Error()
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, err
} else if !proceed {
return nil, nil
failureMessage := "cannot proceed deploying. Either cluster is paused or not ready."
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, nil
}

// Remove any queued entry to cleanup
r.Deployer.CleanupEntries(cluster.Namespace, cluster.Name, classifier.Name, f.id,
clusterproxy.GetClusterType(cluster), true)

// If undeploying feature is in progress, wait for it to complete.
// Otherwise, if we redeploy feature while same feature is still being cleaned up, if two workers process
// those request in parallel some resources might end up missing.
Expand All @@ -981,14 +1009,17 @@ func (r *ClassifierReconciler) processClassifier(ctx context.Context, classifier
if !isConfigSame {
logger.V(logs.LogDebug).Info(fmt.Sprintf("Classifier has changed. Current hash %x. Previous hash %x",
currentHash, hash))
} else {
logger.V(logs.LogDebug).Info("Classifier has not changed")
}

isPullMode, err := clusterproxy.IsClusterInPullMode(ctx, r.Client, cluster.Namespace,
cluster.Name, clusterproxy.GetClusterType(cluster), logger)
if err != nil {
msg := fmt.Sprintf("failed to verify if Cluster is in pull mode: %v", err)
logger.V(logs.LogDebug).Info(msg)
return nil, err
clusterInfo.FailureMessage = &msg
return clusterInfo, err
}

return r.proceedProcessingClassifier(ctx, classifier, cluster, isPullMode, isConfigSame, currentHash, f, logger)
Expand All @@ -998,6 +1029,13 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
cluster *corev1.ObjectReference, isPullMode, isConfigSame bool, currentHash []byte, f feature, logger logr.Logger,
) (*libsveltosv1beta1.ClusterInfo, error) {

clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Hash: currentHash,
FailureMessage: stringPtr(""),
Status: libsveltosv1beta1.SveltosStatusProvisioning,
}

_, currentStatus := r.getClassifierInClusterHashAndStatus(classifier, cluster)

var deployerStatus *libsveltosv1beta1.SveltosFeatureStatus
Expand All @@ -1011,16 +1049,13 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
}

if deployerStatus != nil {
logger.V(logs.LogDebug).Info("result is available. updating status.")
var errorMessage string
logger.V(logs.LogDebug).Info(fmt.Sprintf("result is available %q. updating status.", *deployerStatus))

clusterInfo.Status = *deployerStatus

if result.Err != nil {
errorMessage = result.Err.Error()
}
clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Status: *deployerStatus,
Hash: currentHash,
FailureMessage: &errorMessage,
errorMessage := result.Err.Error()
clusterInfo.FailureMessage = &errorMessage
}

if *deployerStatus == libsveltosv1beta1.SveltosStatusProvisioned {
Expand All @@ -1037,12 +1072,11 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
}
} else if isConfigSame && currentStatus != nil && *currentStatus == libsveltosv1beta1.SveltosStatusProvisioned {
logger.V(logs.LogDebug).Info("already deployed")
s := libsveltosv1beta1.SveltosStatusProvisioned
deployerStatus = &s
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioned
clusterInfo.FailureMessage = nil
} else {
logger.V(logs.LogDebug).Info("no result is available. queue job and mark status as provisioning")
s := libsveltosv1beta1.SveltosStatusProvisioning
deployerStatus = &s
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning

options := deployer.Options{HandlerOptions: make(map[string]any)}
options.HandlerOptions[configurationHash] = currentHash
Expand All @@ -1060,17 +1094,12 @@ func (r *ClassifierReconciler) proceedProcessingClassifier(ctx context.Context,
if err := r.Deployer.Deploy(ctx, cluster.Namespace, cluster.Name,
classifier.Name, f.id, clusterproxy.GetClusterType(cluster), false, handler,
programDuration, options); err != nil {
return nil, err
failureMessage := err.Error()
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, err
}
}

clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Status: *deployerStatus,
Hash: currentHash,
FailureMessage: nil,
}

if clusterInfo.Hash == nil {
panic(1)
}
Expand All @@ -1084,14 +1113,24 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.

var pullmodeStatus *libsveltosv1beta1.FeatureStatus

clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Hash: currentHash,
FailureMessage: nil,
Status: libsveltosv1beta1.SveltosStatusProvisioning,
}

if isConfigSame {
pullmodeHash, err := pullmode.GetRequestorHash(ctx, getManagementClusterClient(),
cluster.Namespace, cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger)
if err != nil {
if !apierrors.IsNotFound(err) {
msg := fmt.Sprintf("failed to get pull mode hash: %v", err)
logger.V(logs.LogDebug).Info(msg)
return nil, err
clusterInfo.FailureMessage = &msg
return clusterInfo, err
} else {
isConfigSame = false
}
} else {
isConfigSame = reflect.DeepEqual(pullmodeHash, currentHash)
Expand All @@ -1104,46 +1143,40 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.
var err error
pullmodeStatus, err = r.proceesAgentDeploymentStatus(ctx, classifier, cluster, f, logger)
if err != nil {
return nil, err
failureMessage := err.Error()
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, err
}
}

clusterInfo := &libsveltosv1beta1.ClusterInfo{
Cluster: *cluster,
Hash: currentHash,
FailureMessage: nil,
}

if pullmodeStatus != nil {
logger.V(logs.LogDebug).Info(fmt.Sprintf("agent result is available. updating status: %v", *pullmodeStatus))

switch *pullmodeStatus {
case libsveltosv1beta1.FeatureStatusProvisioned:
if err := pullmode.TerminateDeploymentTracking(ctx, r.Client, cluster.Namespace,
cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger); err != nil {
logger.V(logs.LogDebug).Info(fmt.Sprintf("failed to terminate tracking: %v", err))
return nil, err
failureMessage := fmt.Sprintf("failed to terminate tracking: %v", err)
logger.V(logs.LogDebug).Info(failureMessage)
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, err
}
provisioned := libsveltosv1beta1.SveltosStatusProvisioned
clusterInfo.Status = provisioned
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioned
return clusterInfo, nil
case libsveltosv1beta1.FeatureStatusProvisioning:
msg := "agent is provisioning the content"
logger.V(logs.LogDebug).Info(msg)
provisioning := libsveltosv1beta1.SveltosStatusProvisioning
clusterInfo.Status = provisioning
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning
return clusterInfo, nil
case libsveltosv1beta1.FeatureStatusFailed:
logger.V(logs.LogDebug).Info("agent failed provisioning the content")
failed := libsveltosv1beta1.SveltosStatusFailed
clusterInfo.Status = failed
clusterInfo.Status = libsveltosv1beta1.SveltosStatusFailed
case libsveltosv1beta1.FeatureStatusFailedNonRetriable, libsveltosv1beta1.FeatureStatusRemoving,
libsveltosv1beta1.FeatureStatusAgentRemoving, libsveltosv1beta1.FeatureStatusRemoved:
logger.V(logs.LogDebug).Info("proceed deploying")
}
} else {
provisioning := libsveltosv1beta1.SveltosStatusProvisioning
clusterInfo.Status = provisioning
clusterInfo.Status = libsveltosv1beta1.SveltosStatusProvisioning
}

// Getting here means either agent failed to deploy feature or configuration has changed.
Expand All @@ -1155,7 +1188,9 @@ func (r *ClassifierReconciler) proceedDeployingClassifierInPullMode(ctx context.
if err := r.Deployer.Deploy(ctx, cluster.Namespace, cluster.Name,
classifier.Name, f.id, clusterproxy.GetClusterType(cluster), false,
deployClassifierInCluster, programDuration, options); err != nil {
return nil, err
failureMessage := err.Error()
clusterInfo.FailureMessage = &failureMessage
return clusterInfo, err
}

return clusterInfo, fmt.Errorf("request to deploy queued")
Expand All @@ -1175,7 +1210,11 @@ func (r *ClassifierReconciler) proceesAgentDeploymentStatus(ctx context.Context,
if pullmode.IsProcessingMismatch(err) {
provisioning := libsveltosv1beta1.FeatureStatusProvisioning
return &provisioning, nil
} else if pullmode.IsActionNotSetToDeploy(err) {
_ = pullmode.TerminateDeploymentTracking(ctx, r.Client, cluster.Namespace,
cluster.Name, libsveltosv1beta1.ClassifierKind, classifier.Name, f.id, logger)
}
return nil, err
}

return status.DeploymentStatus, err
Expand Down Expand Up @@ -1698,7 +1737,7 @@ func deploySveltosApplierResources(ctx context.Context, clusterNamespace, cluste

var referencedUnstructured []*unstructured.Unstructured
if len(patches) > 0 {
logger.V(logs.LogInfo).Info("Patch sveltos-applier resources")
logger.V(logs.LogDebug).Info("Patch sveltos-applier resources")
p := &patcher.CustomPatchPostRenderer{Patches: patches}
referencedUnstructured, err = p.RunUnstructured(
[]*unstructured.Unstructured{policy},
Expand Down Expand Up @@ -2147,10 +2186,12 @@ func getPerClusterPatches(ctx context.Context, c client.Client,

patches, err := getSveltosApplierPatchesNew(ctx, c, configMapNamespace, configMapName, logger)
if err != nil {
logger.Error(err, "failed to get ConfigMap with drift-detection patches",
logger.Error(err, "failed to get ConfigMap with patches",
"configMapNamespace", configMapNamespace, "configMapName", configMapName)
return nil, err
}

logger.V(logs.LogDebug).Info(fmt.Sprintf("got patches from ConfigMap %s/%s",
configMapNamespace, configMapName))
return patches, nil
}
5 changes: 3 additions & 2 deletions controllers/classifier_report_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ func collectClassifierReports(c client.Client, shardKey, capiOnboardAnnotation,

for i := range clusterList {
cluster := &clusterList[i]
err = collectClassifierReportsFromCluster(ctx, c, cluster, version, logger)
l := logger.WithValues("cluster", fmt.Sprintf("%s:%s/%s", cluster.Kind, cluster.Namespace, cluster.Name))
err = collectClassifierReportsFromCluster(ctx, c, cluster, version, l)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to collect ClassifierReports from cluster: %s/%s %v",
l.V(logs.LogInfo).Info(fmt.Sprintf("failed to collect ClassifierReports from cluster: %s/%s %v",
cluster.Namespace, cluster.Name, err))
}
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,7 @@ func deplAssociatedClusterExist(ctx context.Context, c client.Client, depl *apps

return true, "", "", ""
}

func stringPtr(s string) *string {
return &s
}