forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
chore: Validate artifact drivers on server start. re #10219 #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
JPZ13
merged 15 commits into
artifact-plugins
from
10219-ensure-grpc-servers-present-on-argo-server-startup
Sep 9, 2025
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
b0f2bb3
feat: initial commit for artifact plugins
Joibel c98ec98
chore: Validate artifact drivers on server start. re: #10219
JPZ13 05b8d73
fix server lint errors
JPZ13 eb760e6
use require instead of assert
JPZ13 c9ffd7d
remove generation
JPZ13 f6f938b
use t.setenv
JPZ13 10a5631
remove deployment in server test
JPZ13 de9a542
structure logs
JPZ13 a9da605
fix error overlap
JPZ13 7cccf0b
run codegen
JPZ13 8d57ccd
fix comment
JPZ13 0387c96
run codegen
JPZ13 fbd4458
refactor to use util fn for get pod name
JPZ13 3d00749
update tests
JPZ13 dec565d
switch back to assert
JPZ13 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,23 +3,30 @@ package apiserver | |||||
| import ( | ||||||
| "context" | ||||||
| "crypto/tls" | ||||||
| "errors" | ||||||
| "fmt" | ||||||
| "net" | ||||||
| "net/http" | ||||||
| "os" | ||||||
| "slices" | ||||||
| "strings" | ||||||
| "sync" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/gorilla/handlers" | ||||||
| grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
| grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" | ||||||
| "github.com/grpc-ecosystem/grpc-gateway/runtime" | ||||||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||||||
| "github.com/sethvargo/go-limiter" | ||||||
| "github.com/sethvargo/go-limiter/httplimit" | ||||||
| "github.com/sethvargo/go-limiter/memorystore" | ||||||
| "google.golang.org/grpc" | ||||||
| "google.golang.org/grpc/credentials" | ||||||
| "google.golang.org/grpc/credentials/insecure" | ||||||
| "google.golang.org/grpc/metadata" | ||||||
| v1 "k8s.io/api/core/v1" | ||||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| "k8s.io/apimachinery/pkg/util/wait" | ||||||
| "k8s.io/client-go/kubernetes" | ||||||
| "k8s.io/client-go/rest" | ||||||
|
|
@@ -60,16 +67,14 @@ import ( | |||||
| grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc" | ||||||
| "github.com/argoproj/argo-workflows/v3/util/instanceid" | ||||||
| "github.com/argoproj/argo-workflows/v3/util/json" | ||||||
| k8sutil "github.com/argoproj/argo-workflows/v3/util/k8s" | ||||||
| "github.com/argoproj/argo-workflows/v3/util/logging" | ||||||
| rbacutil "github.com/argoproj/argo-workflows/v3/util/rbac" | ||||||
| "github.com/argoproj/argo-workflows/v3/util/sqldb" | ||||||
| "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories" | ||||||
| "github.com/argoproj/argo-workflows/v3/workflow/artifacts/plugin" | ||||||
| "github.com/argoproj/argo-workflows/v3/workflow/events" | ||||||
| "github.com/argoproj/argo-workflows/v3/workflow/hydrator" | ||||||
|
|
||||||
| "github.com/sethvargo/go-limiter" | ||||||
| "github.com/sethvargo/go-limiter/httplimit" | ||||||
| "github.com/sethvargo/go-limiter/memorystore" | ||||||
| ) | ||||||
|
|
||||||
| var MaxGRPCMessageSize int | ||||||
|
|
@@ -209,7 +214,19 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st | |||||
| if err != nil { | ||||||
| log.WithFatal().Error(ctx, err.Error()) | ||||||
| } | ||||||
|
|
||||||
| // Validate artifact driver images against server pod images | ||||||
| if err := as.validateArtifactDriverImages(ctx, config); err != nil { | ||||||
| log.WithFatal().WithError(err).Error(ctx, "failed to validate artifact driver images") | ||||||
| } | ||||||
|
|
||||||
| // Validate artifact driver connections | ||||||
| if err := as.validateArtifactDriverConnections(ctx, config); err != nil { | ||||||
| log.WithFatal().WithError(err).Error(ctx, "failed to validate artifact driver connections") | ||||||
| } | ||||||
|
|
||||||
| log.WithFields(argo.GetVersion().Fields()).WithField("instanceID", config.InstanceID).Info(ctx, "Starting Argo Server") | ||||||
|
|
||||||
| instanceIDService := instanceid.NewService(config.InstanceID) | ||||||
| offloadRepo := persist.ExplosiveOffloadNodeStatusRepo | ||||||
| wfArchive := persist.NullWorkflowArchive | ||||||
|
|
@@ -443,6 +460,111 @@ func (as *argoServer) newHTTPServer(ctx context.Context, port int, artifactServe | |||||
| return handler | ||||||
| } | ||||||
|
|
||||||
| // validateArtifactDriverConnections validates that all configured artifact drivers can be connected to | ||||||
| func (as *argoServer) validateArtifactDriverConnections(ctx context.Context, cfg *config.Config) error { | ||||||
| log := logging.RequireLoggerFromContext(ctx) | ||||||
| if len(cfg.ArtifactDrivers) == 0 { | ||||||
| log.Info(ctx, "No artifact drivers configured, skipping connection validation") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| log.Info(ctx, "Validating artifact driver connections") | ||||||
|
|
||||||
| var wg sync.WaitGroup | ||||||
| errorChannel := make(chan error, len(cfg.ArtifactDrivers)) | ||||||
|
|
||||||
| // Validate each driver connection in parallel | ||||||
| for _, driver := range cfg.ArtifactDrivers { | ||||||
| wg.Add(1) | ||||||
| go func(driver config.ArtifactDriver) { | ||||||
| defer wg.Done() | ||||||
|
|
||||||
| // Create a new driver connection | ||||||
| pluginDriver, err := plugin.NewDriver(ctx, driver.Name, driver.Name.SocketPath(), 5) // replace with driver.ConnectionTimeoutSeconds once we have it | ||||||
| if err != nil { | ||||||
| errorChannel <- fmt.Errorf("failed to connect to artifact driver %s: %w", driver.Name, err) | ||||||
| return | ||||||
| } | ||||||
|
|
||||||
| // Close the connection after validation | ||||||
| defer func() { | ||||||
| if closeErr := pluginDriver.Close(); closeErr != nil { | ||||||
| log.WithError(closeErr).WithField("driver", driver.Name).Warn(ctx, "Failed to close connection to artifact driver") | ||||||
| } | ||||||
| }() | ||||||
|
|
||||||
| log.WithField("driver", driver.Name).Info(ctx, "Successfully validated connection to artifact driver") | ||||||
| }(driver) | ||||||
| } | ||||||
|
|
||||||
| // Wait for all validations to complete | ||||||
| wg.Wait() | ||||||
| close(errorChannel) | ||||||
|
|
||||||
| // Collect any errors | ||||||
| var connectionErrors []string | ||||||
| for err := range errorChannel { | ||||||
| connectionErrors = append(connectionErrors, err.Error()) | ||||||
| } | ||||||
|
|
||||||
| if len(connectionErrors) > 0 { | ||||||
| errorMsg := fmt.Sprintf("Artifact driver connection validation failed: %v", connectionErrors) | ||||||
| log.WithField("errors", connectionErrors).Error(ctx, errorMsg) | ||||||
| return errors.New(errorMsg) | ||||||
| } | ||||||
|
|
||||||
| log.WithField("driverCount", len(cfg.ArtifactDrivers)).Info(ctx, "Artifact driver connection validation passed: All configured artifact drivers are accessible") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // validateArtifactDriverImages validates that the artifact driver images are present in the server pod | ||||||
| func (as *argoServer) validateArtifactDriverImages(ctx context.Context, cfg *config.Config) error { | ||||||
| log := logging.RequireLoggerFromContext(ctx) | ||||||
| if len(cfg.ArtifactDrivers) == 0 { | ||||||
| log.Info(ctx, "No artifact drivers configured, skipping validation") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| log.Info(ctx, "Validating artifact driver images against server pod") | ||||||
|
|
||||||
| // Get the current pod name using the standard Argo pattern | ||||||
| podName, err := k8sutil.GetCurrentPodName(ctx, as.clients.Kubernetes, as.namespace, "app=argo-server") | ||||||
| if err != nil { | ||||||
| log.WithError(err).Warn(ctx, "Failed to get current pod name, cannot validate artifact driver images") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| log.WithField("podName", podName).Debug(ctx, "Found argo-server pod for validation") | ||||||
|
|
||||||
| // Get the current pod to check the available images | ||||||
| pod, err := as.clients.Kubernetes.CoreV1().Pods(as.namespace).Get(ctx, podName, metav1.GetOptions{}) | ||||||
| if err != nil { | ||||||
| log.WithError(err).WithField("podName", podName).Warn(ctx, "Failed to get current pod, cannot validate artifact driver images") | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // Get missing images | ||||||
| images := make([]string, 0, len(cfg.ArtifactDrivers)) | ||||||
| for _, driver := range cfg.ArtifactDrivers { | ||||||
| images = append(images, driver.Image) | ||||||
| } | ||||||
|
|
||||||
| for _, container := range pod.Spec.Containers { | ||||||
| images = slices.DeleteFunc(images, func(image string) bool { | ||||||
| return image == container.Image | ||||||
| }) | ||||||
| } | ||||||
|
|
||||||
| if len(images) > 0 { | ||||||
| errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the server pod: %v. Please ensure all artifact driver images are included in the argo-server pod.", images) | ||||||
|
||||||
| errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the server pod: %v. Please ensure all artifact driver images are included in the argo-server pod.", images) | |
| errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the current pod: %v. Please ensure all artifact driver images are included in the deployment template for the argo-server.", images) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.