diff --git a/src/pkg/api.go b/src/pkg/api.go index 9f2596a..476062e 100644 --- a/src/pkg/api.go +++ b/src/pkg/api.go @@ -3,6 +3,7 @@ package pkg import ( "fmt" "strings" + "sync" "time" "github.com/go-resty/resty/v2" @@ -13,9 +14,11 @@ import ( ) var ( - _version string - _clientRest *resty.Client - _clientGQL *opslevel.Client + _version string + _clientRest *resty.Client + _clientGQL *opslevel.Client + _clientRestOnce sync.Once + _clientGQLOnce sync.Once ) func SetClientVersion(version string) { @@ -23,16 +26,16 @@ func SetClientVersion(version string) { } func NewRestClient() *resty.Client { - if _clientRest == nil { + _clientRestOnce.Do(func() { _clientRest = opslevel.NewRestClient(opslevel.SetURL(viper.GetString("api-url"))) - } + }) return _clientRest } func NewGraphClient() *opslevel.Client { - if _clientGQL == nil { + _clientGQLOnce.Do(func() { _clientGQL = newGraphClient() - } + }) return _clientGQL } @@ -57,7 +60,6 @@ func newGraphClient() *opslevel.Client { cobra.CheckErr(clientErr) } } - cobra.CheckErr(clientErr) return client } diff --git a/src/pkg/k8s.go b/src/pkg/k8s.go index b856c04..20a4fd5 100644 --- a/src/pkg/k8s.go +++ b/src/pkg/k8s.go @@ -29,6 +29,11 @@ import ( "github.com/spf13/viper" ) +const ( + ContainerNameHelper = "helper" + ContainerNameJob = "job" +) + var ( ImageTagVersion string k8sValidated bool @@ -204,7 +209,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo NodeSelector: s.podConfig.NodeSelector, InitContainers: []corev1.Container{ { - Name: "helper", + Name: ContainerNameHelper, Image: "public.ecr.aws/opslevel/opslevel-runner:v2024.1.3", // TODO: fmt.Sprintf("public.ecr.aws/opslevel/opslevel-runner:v%s", ImageTagVersion), ImagePullPolicy: s.podConfig.PullPolicy, Command: []string{ @@ -223,7 +228,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo }, Containers: []corev1.Container{ { - Name: "job", + Name: ContainerNameJob, Image: job.Image, ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ @@ -296,8 +301,8 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std } } // TODO: manage pods based on image for re-use? - cfgMap, err := s.CreateConfigMap(s.getConfigMapObject(identifier, job)) - defer s.DeleteConfigMap(cfgMap) // TODO: if we reuse pods then delete should not happen? + cfgMap, err := s.CreateConfigMap(ctx, s.getConfigMapObject(identifier, job)) + defer s.DeleteConfigMap(context.Background(), cfgMap) // Use Background for cleanup to ensure it completes if err != nil { return JobOutcome{ Message: fmt.Sprintf("failed to create configmap REASON: %s", err), @@ -305,8 +310,8 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std } } - pdb, err := s.CreatePDB(s.getPBDObject(identifier, labelSelector)) - defer s.DeletePDB(pdb) // TODO: if we reuse pods then delete should not happen? + pdb, err := s.CreatePDB(ctx, s.getPBDObject(identifier, labelSelector)) + defer s.DeletePDB(context.Background(), pdb) // Use Background for cleanup to ensure it completes if err != nil { return JobOutcome{ Message: fmt.Sprintf("failed to create pod disruption budget REASON: %s", err), @@ -314,8 +319,8 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std } } - pod, err := s.CreatePod(s.getPodObject(identifier, labels, job)) - defer s.DeletePod(pod) // TODO: if we reuse pods then delete should not happen + pod, err := s.CreatePod(ctx, s.getPodObject(identifier, labels, job)) + defer s.DeletePod(context.Background(), pod) // Use Background for cleanup to ensure it completes if err != nil { return JobOutcome{ Message: fmt.Sprintf("failed to create pod REASON: %s", err), @@ -324,7 +329,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std } timeout := time.Second * time.Duration(viper.GetInt("job-pod-max-wait")) - waitErr := s.WaitForPod(pod, timeout) + waitErr := s.WaitForPod(ctx, pod, timeout) if waitErr != nil { // TODO: get pod status or status message? return JobOutcome{ @@ -410,24 +415,24 @@ func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *c }) } -func (s *JobRunner) CreateConfigMap(config *corev1.ConfigMap) (*corev1.ConfigMap, error) { +func (s *JobRunner) CreateConfigMap(ctx context.Context, config *corev1.ConfigMap) (*corev1.ConfigMap, error) { s.logger.Trace().Msgf("Creating configmap %s/%s ...", config.Namespace, config.Name) - return s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{}) + return s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) } -func (s *JobRunner) CreatePDB(config *policyv1.PodDisruptionBudget) (*policyv1.PodDisruptionBudget, error) { +func (s *JobRunner) CreatePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) (*policyv1.PodDisruptionBudget, error) { s.logger.Trace().Msgf("Creating pod disruption budget %s/%s ...", config.Namespace, config.Name) - return s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{}) + return s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) } -func (s *JobRunner) CreatePod(config *corev1.Pod) (*corev1.Pod, error) { +func (s *JobRunner) CreatePod(ctx context.Context, config *corev1.Pod) (*corev1.Pod, error) { s.logger.Trace().Msgf("Creating pod %s/%s ...", config.Namespace, config.Name) - return s.clientset.CoreV1().Pods(config.Namespace).Create(context.TODO(), config, metav1.CreateOptions{}) + return s.clientset.CoreV1().Pods(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) } func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWithContextFunc { - return func(context.Context) (bool, error) { - pod, err := s.clientset.CoreV1().Pods(podConfig.Namespace).Get(context.TODO(), podConfig.Name, metav1.GetOptions{}) + return func(ctx context.Context) (bool, error) { + pod, err := s.clientset.CoreV1().Pods(podConfig.Namespace).Get(ctx, podConfig.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -441,30 +446,32 @@ func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWit } } -func (s *JobRunner) WaitForPod(podConfig *corev1.Pod, timeout time.Duration) error { +func (s *JobRunner) WaitForPod(ctx context.Context, podConfig *corev1.Pod, timeout time.Duration) error { s.logger.Debug().Msgf("Waiting for pod %s/%s to be ready in %s ...", podConfig.Namespace, podConfig.Name, timeout) - return wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, false, s.isPodInDesiredState(podConfig)) + waitCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return wait.PollUntilContextTimeout(waitCtx, time.Second, timeout, false, s.isPodInDesiredState(podConfig)) } -func (s *JobRunner) DeleteConfigMap(config *corev1.ConfigMap) { +func (s *JobRunner) DeleteConfigMap(ctx context.Context, config *corev1.ConfigMap) { s.logger.Trace().Msgf("Deleting configmap %s/%s ...", config.Namespace, config.Name) - err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{}) + err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { s.logger.Error().Err(err).Msgf("received error on ConfigMap deletion") } } -func (s *JobRunner) DeletePDB(config *policyv1.PodDisruptionBudget) { - s.logger.Trace().Msgf("Deleting configmap %s/%s ...", config.Namespace, config.Name) - err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{}) +func (s *JobRunner) DeletePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) { + s.logger.Trace().Msgf("Deleting PDB %s/%s ...", config.Namespace, config.Name) + err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { s.logger.Error().Err(err).Msgf("received error on PDB deletion") } } -func (s *JobRunner) DeletePod(config *corev1.Pod) { +func (s *JobRunner) DeletePod(ctx context.Context, config *corev1.Pod) { s.logger.Trace().Msgf("Deleting pod %s/%s ...", config.Namespace, config.Name) - err := s.clientset.CoreV1().Pods(config.Namespace).Delete(context.TODO(), config.Name, metav1.DeleteOptions{}) + err := s.clientset.CoreV1().Pods(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { s.logger.Error().Err(err).Msgf("received error on Pod deletion") } diff --git a/src/pkg/leaderElection.go b/src/pkg/leaderElection.go index 61a0d4e..0725160 100644 --- a/src/pkg/leaderElection.go +++ b/src/pkg/leaderElection.go @@ -48,40 +48,46 @@ func RunLeaderElection(ctx context.Context, runnerId opslevel.ID, lockName, lock isLeader = true logger.Info().Msgf("leader is %s", lockIdentity) deploymentsClient := client.AppsV1().Deployments(lockNamespace) + // Not allowing this sleep interval to be configurable for now + // to prevent it being set too low and causing thundering herd + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() for { - // Not allowing this sleep interval to be configurable for now - // to prevent it being set too low and causing thundering herd - time.Sleep(60 * time.Second) - result, getErr := deploymentsClient.Get(context.TODO(), lockName, metav1.GetOptions{}) - if getErr != nil { - logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment") - continue - } - replicaCount, err := getReplicaCount(runnerId, int(*result.Spec.Replicas)) - if err != nil { - logger.Error().Err(err).Msg("Failed to get replica count") - continue - } - logger.Info().Msgf("Ideal replica count is %v", replicaCount) - // Retry is being used below to prevent deployment update from overwriting a - // separate and unrelated update action per client-go's recommendation: - // https://github.com/kubernetes/client-go/blob/master/examples/create-update-delete-deployment/main.go#L117 - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - result, getErr := deploymentsClient.Get(context.TODO(), lockName, metav1.GetOptions{}) + select { + case <-c.Done(): + logger.Info().Msg("Leader election context cancelled, stopping scaling loop") + return + case <-ticker.C: + result, getErr := deploymentsClient.Get(c, lockName, metav1.GetOptions{}) if getErr != nil { logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment") - return getErr + continue + } + replicaCount, err := getReplicaCount(runnerId, int(*result.Spec.Replicas)) + if err != nil { + logger.Error().Err(err).Msg("Failed to get replica count") + continue } - result.Spec.Replicas = &replicaCount - _, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{}) - return updateErr - }) - if retryErr != nil { - logger.Error().Err(retryErr).Msg("Failed to set replica count") - continue + logger.Info().Msgf("Ideal replica count is %v", replicaCount) + // Retry is being used below to prevent deployment update from overwriting a + // separate and unrelated update action per client-go's recommendation: + // https://github.com/kubernetes/client-go/blob/master/examples/create-update-delete-deployment/main.go#L117 + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + result, getErr := deploymentsClient.Get(c, lockName, metav1.GetOptions{}) + if getErr != nil { + logger.Error().Err(getErr).Msg("Failed to get latest version of Deployment") + return getErr + } + result.Spec.Replicas = &replicaCount + _, updateErr := deploymentsClient.Update(c, result, metav1.UpdateOptions{}) + return updateErr + }) + if retryErr != nil { + logger.Error().Err(retryErr).Msg("Failed to set replica count") + continue + } + logger.Info().Msgf("Successfully set replica count to %v", replicaCount) } - logger.Info().Msgf("Successfully set replica count to %v", replicaCount) - } }, OnStoppedLeading: func() { diff --git a/src/pkg/logs.go b/src/pkg/logs.go index 3efb5ee..52c16ff 100644 --- a/src/pkg/logs.go +++ b/src/pkg/logs.go @@ -52,6 +52,8 @@ func (s *LogStreamer) GetLogBuffer() []string { func (s *LogStreamer) Run(ctx context.Context) { s.logger.Trace().Msg("Starting log streamer ...") + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): @@ -60,7 +62,7 @@ func (s *LogStreamer) Run(ctx context.Context) { case <-s.quit: s.logger.Trace().Msg("Shutting down log streamer ...") return - default: + case <-ticker.C: for len(s.Stderr.String()) > 0 { line, err := s.Stderr.ReadString('\n') if err == nil { @@ -89,12 +91,19 @@ func (s *LogStreamer) Run(ctx context.Context) { func (s *LogStreamer) Flush(outcome JobOutcome) { s.logger.Trace().Msg("Starting log streamer flush ...") - for len(s.Stderr.String()) > 0 { - time.Sleep(200 * time.Millisecond) - } - for len(s.Stdout.String()) > 0 { - time.Sleep(200 * time.Millisecond) + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + timeout := time.After(30 * time.Second) + for len(s.Stderr.String()) > 0 || len(s.Stdout.String()) > 0 { + select { + case <-ticker.C: + // Continue waiting + case <-timeout: + s.logger.Warn().Msg("Flush timeout reached, proceeding with remaining data") + goto done + } } +done: s.logger.Trace().Msg("Finished log streamer flush ...") s.quit <- true time.Sleep(200 * time.Millisecond) // Allow 'Run' goroutine to quit diff --git a/src/pkg/opslevelAppendLogProcessor.go b/src/pkg/opslevelAppendLogProcessor.go index d74d3c9..4861f9e 100644 --- a/src/pkg/opslevelAppendLogProcessor.go +++ b/src/pkg/opslevelAppendLogProcessor.go @@ -97,6 +97,5 @@ func (s *OpsLevelAppendLogProcessor) submit() { } } s.logLinesBytesSize = 0 - s.logLines = nil - s.logLines = []string{} + s.logLines = s.logLines[:0] }