Skip to content
130 changes: 126 additions & 4 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,30 @@ package apiserver
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"os"
"slices"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -60,16 +67,14 @@ import (
grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/util/json"
k8sutil "github.com/argoproj/argo-workflows/v3/util/k8s"
"github.com/argoproj/argo-workflows/v3/util/logging"
rbacutil "github.com/argoproj/argo-workflows/v3/util/rbac"
"github.com/argoproj/argo-workflows/v3/util/sqldb"
"github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories"
"github.com/argoproj/argo-workflows/v3/workflow/artifacts/plugin"
"github.com/argoproj/argo-workflows/v3/workflow/events"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"

"github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/httplimit"
"github.com/sethvargo/go-limiter/memorystore"
)

var MaxGRPCMessageSize int
Expand Down Expand Up @@ -209,7 +214,19 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.WithFatal().Error(ctx, err.Error())
}

// Validate artifact driver images against server pod images
if err := as.validateArtifactDriverImages(ctx, config); err != nil {
log.WithFatal().WithError(err).Error(ctx, "failed to validate artifact driver images")
}

// Validate artifact driver connections
if err := as.validateArtifactDriverConnections(ctx, config); err != nil {
log.WithFatal().WithError(err).Error(ctx, "failed to validate artifact driver connections")
}

log.WithFields(argo.GetVersion().Fields()).WithField("instanceID", config.InstanceID).Info(ctx, "Starting Argo Server")

instanceIDService := instanceid.NewService(config.InstanceID)
offloadRepo := persist.ExplosiveOffloadNodeStatusRepo
wfArchive := persist.NullWorkflowArchive
Expand Down Expand Up @@ -443,6 +460,111 @@ func (as *argoServer) newHTTPServer(ctx context.Context, port int, artifactServe
return handler
}

// validateArtifactDriverConnections validates that all configured artifact drivers can be connected to
func (as *argoServer) validateArtifactDriverConnections(ctx context.Context, cfg *config.Config) error {
log := logging.RequireLoggerFromContext(ctx)
if len(cfg.ArtifactDrivers) == 0 {
log.Info(ctx, "No artifact drivers configured, skipping connection validation")
return nil
}

log.Info(ctx, "Validating artifact driver connections")

var wg sync.WaitGroup
errorChannel := make(chan error, len(cfg.ArtifactDrivers))

// Validate each driver connection in parallel
for _, driver := range cfg.ArtifactDrivers {
wg.Add(1)
go func(driver config.ArtifactDriver) {
defer wg.Done()

// Create a new driver connection
pluginDriver, err := plugin.NewDriver(ctx, driver.Name, driver.Name.SocketPath(), 5) // replace with driver.ConnectionTimeoutSeconds once we have it
if err != nil {
errorChannel <- fmt.Errorf("failed to connect to artifact driver %s: %w", driver.Name, err)
return
}

// Close the connection after validation
defer func() {
if closeErr := pluginDriver.Close(); closeErr != nil {
log.WithError(closeErr).WithField("driver", driver.Name).Warn(ctx, "Failed to close connection to artifact driver")
}
}()

log.WithField("driver", driver.Name).Info(ctx, "Successfully validated connection to artifact driver")
}(driver)
}

// Wait for all validations to complete
wg.Wait()
close(errorChannel)

// Collect any errors
var connectionErrors []string
for err := range errorChannel {
connectionErrors = append(connectionErrors, err.Error())
}

if len(connectionErrors) > 0 {
errorMsg := fmt.Sprintf("Artifact driver connection validation failed: %v", connectionErrors)
log.WithField("errors", connectionErrors).Error(ctx, errorMsg)
return errors.New(errorMsg)
}

log.WithField("driverCount", len(cfg.ArtifactDrivers)).Info(ctx, "Artifact driver connection validation passed: All configured artifact drivers are accessible")
return nil
}

// validateArtifactDriverImages validates that the artifact driver images are present in the server pod
func (as *argoServer) validateArtifactDriverImages(ctx context.Context, cfg *config.Config) error {
log := logging.RequireLoggerFromContext(ctx)
if len(cfg.ArtifactDrivers) == 0 {
log.Info(ctx, "No artifact drivers configured, skipping validation")
return nil
}

log.Info(ctx, "Validating artifact driver images against server pod")

// Get the current pod name using the standard Argo pattern
podName, err := k8sutil.GetCurrentPodName(ctx, as.clients.Kubernetes, as.namespace, "app=argo-server")
if err != nil {
log.WithError(err).Warn(ctx, "Failed to get current pod name, cannot validate artifact driver images")
return nil
}

log.WithField("podName", podName).Debug(ctx, "Found argo-server pod for validation")

// Get the current pod to check the available images
pod, err := as.clients.Kubernetes.CoreV1().Pods(as.namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
log.WithError(err).WithField("podName", podName).Warn(ctx, "Failed to get current pod, cannot validate artifact driver images")
return nil
}

// Get missing images
images := make([]string, 0, len(cfg.ArtifactDrivers))
for _, driver := range cfg.ArtifactDrivers {
images = append(images, driver.Image)
}

for _, container := range pod.Spec.Containers {
images = slices.DeleteFunc(images, func(image string) bool {
return image == container.Image
})
}

if len(images) > 0 {
errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the server pod: %v. Please ensure all artifact driver images are included in the argo-server pod.", images)
Copy link

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message mentions 'server pod' but should be more specific about checking the deployment, since the validation logic in the test suggests it should check the deployment template rather than just the current pod.

Suggested change
errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the server pod: %v. Please ensure all artifact driver images are included in the argo-server pod.", images)
errorMsg := fmt.Sprintf("Artifact driver validation failed: The following artifact driver images are not present in the current pod: %v. Please ensure all artifact driver images are included in the deployment template for the argo-server.", images)

Copilot uses AI. Check for mistakes.
log.Error(ctx, errorMsg)
return errors.New(errorMsg)
}

log.WithField("driverCount", len(cfg.ArtifactDrivers)).Info(ctx, "Artifact driver validation passed: All configured artifact driver images are present in the server pod")
return nil
}

type registerFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error

// mustRegisterGWHandler is a convenience function to register a gateway handler
Expand Down
Loading