From 39e6f01c04fa56e2c7e6249b9fe377828ac0daca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Reme=C5=A1?= Date: Thu, 8 Jun 2023 14:07:26 +0200 Subject: [PATCH 1/2] always initialize API config observer --- pkg/controller/operator.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index fd10b150e..b22014716 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -89,6 +89,10 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller if envVersion, exists := os.LookupEnv("RELEASE_VERSION"); exists { desiredVersion = envVersion } + apiConfigObserver, err := configobserver.NewAPIConfigObserver(gatherKubeConfig, controller.EventRecorder, configInformers) + if err != nil { + return err + } // By default, this will exit(0) the process if the featuregates ever change to a different set of values. featureGateAccessor := featuregates.NewFeatureGateAccess( @@ -120,12 +124,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller return fmt.Errorf("can't create --path: %v", err) } } - var apiConfigObserver configobserver.APIConfigObserver if insightsConfigAPIEnabled { - apiConfigObserver, err = configobserver.NewAPIConfigObserver(gatherKubeConfig, controller.EventRecorder, configInformers) - if err != nil { - return err - } go apiConfigObserver.Run(ctx, 1) } From d29ea61b09e1223ae88d0cb61336a06b96061716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Reme=C5=A1?= Date: Fri, 9 Jun 2023 13:42:28 +0200 Subject: [PATCH 2/2] run always, but try to shutdown --- pkg/anonymization/anonymizer.go | 8 +-- ...ver.go => insighgtsdatagather_observer.go} | 53 +++++++++++-------- pkg/controller/operator.go | 13 ++--- pkg/controller/periodic/periodic.go | 4 +- pkg/controller/status/controller.go | 4 +- .../insightsuploader/insightsuploader.go | 4 +- 6 files changed, 48 insertions(+), 38 deletions(-) rename pkg/config/configobserver/{apiconfigobserver.go => insighgtsdatagather_observer.go} (59%) diff --git a/pkg/anonymization/anonymizer.go b/pkg/anonymization/anonymizer.go index b8f5e5008..ace6e230f 100644 --- a/pkg/anonymization/anonymizer.go +++ b/pkg/anonymization/anonymizer.go @@ -88,7 +88,7 @@ type Anonymizer struct { ipNetworkRegex *regexp.Regexp secretsClient corev1client.SecretInterface secretConfigurator configobserver.Configurator - apiConfigurator configobserver.APIConfigObserver + apiConfigurator configobserver.InsightsDataGatherObserver configClient configv1client.ConfigV1Interface networkClient networkv1client.NetworkV1Interface gatherKubeClient kubernetes.Interface @@ -104,7 +104,7 @@ func NewAnonymizer(clusterBaseDomain string, networks []string, secretsClient corev1client.SecretInterface, secretConfigurator configobserver.Configurator, - apiConfigurator configobserver.APIConfigObserver) (*Anonymizer, error) { + apiConfigurator configobserver.InsightsDataGatherObserver) (*Anonymizer, error) { cidrs, err := k8snet.ParseCIDRs(networks) if err != nil { return nil, err @@ -138,7 +138,7 @@ func NewAnonymizerFromConfigClient( configClient configv1client.ConfigV1Interface, networkClient networkv1client.NetworkV1Interface, secretConfigurator configobserver.Configurator, - apiConfigurator configobserver.APIConfigObserver, + apiConfigurator configobserver.InsightsDataGatherObserver, ) (*Anonymizer, error) { baseDomain, err := utils.GetClusterBaseDomain(ctx, configClient) if err != nil { @@ -322,7 +322,7 @@ func NewAnonymizerFromConfig( gatherProtoKubeConfig *rest.Config, protoKubeConfig *rest.Config, secretConfigurator configobserver.Configurator, - apiConfigurator configobserver.APIConfigObserver, + apiConfigurator configobserver.InsightsDataGatherObserver, ) (*Anonymizer, error) { kubeClient, err := kubernetes.NewForConfig(protoKubeConfig) if err != nil { diff --git a/pkg/config/configobserver/apiconfigobserver.go b/pkg/config/configobserver/insighgtsdatagather_observer.go similarity index 59% rename from pkg/config/configobserver/apiconfigobserver.go rename to pkg/config/configobserver/insighgtsdatagather_observer.go index c4cf3274a..96c8bbd6b 100644 --- a/pkg/config/configobserver/apiconfigobserver.go +++ b/pkg/config/configobserver/insighgtsdatagather_observer.go @@ -15,32 +15,35 @@ import ( "k8s.io/klog/v2" ) -type APIConfigObserver interface { +type InsightsDataGatherObserver interface { factory.Controller GatherConfig() *v1alpha1.GatherConfig GatherDataPolicy() *v1alpha1.DataPolicy GatherDisabled() bool } -type APIConfigController struct { +type insightsDataGatherController struct { factory.Controller lock sync.Mutex listeners map[chan *v1alpha1.GatherConfig]struct{} configV1Alpha1Cli *configCliv1alpha1.ConfigV1alpha1Client gatherConfig *v1alpha1.GatherConfig + featureEnabled bool } -func NewAPIConfigObserver(kubeConfig *rest.Config, +func NewInsightsDataGatherObserver(kubeConfig *rest.Config, eventRecorder events.Recorder, - configInformer configinformers.SharedInformerFactory) (APIConfigObserver, error) { + configInformer configinformers.SharedInformerFactory, + featureEnabled bool) (InsightsDataGatherObserver, error) { inf := configInformer.Config().V1alpha1().InsightsDataGathers().Informer() configV1Alpha1Cli, err := configCliv1alpha1.NewForConfig(kubeConfig) if err != nil { return nil, err } - c := &APIConfigController{ + c := &insightsDataGatherController{ configV1Alpha1Cli: configV1Alpha1Cli, listeners: make(map[chan *v1alpha1.GatherConfig]struct{}), + featureEnabled: featureEnabled, } insightDataGatherConf, err := c.configV1Alpha1Cli.InsightsDataGathers().Get(context.Background(), "cluster", metav1.GetOptions{}) @@ -51,42 +54,48 @@ func NewAPIConfigObserver(kubeConfig *rest.Config, ctrl := factory.New().WithInformers(inf). WithSync(c.sync). - ToController("InsightConfigController", eventRecorder) + ToController("InsightsDataGatherObserver", eventRecorder) c.Controller = ctrl return c, nil } -func (a *APIConfigController) sync(ctx context.Context, _ factory.SyncContext) error { - insightDataGatherConf, err := a.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{}) +func (i *insightsDataGatherController) sync(ctx context.Context, scx factory.SyncContext) error { + if !i.featureEnabled { + klog.Infof("Shutting down the queue and the event recorder for %s", i.Name()) + scx.Queue().ShutDown() + scx.Recorder().Shutdown() + return nil + } + insightDataGatherConf, err := i.configV1Alpha1Cli.InsightsDataGathers().Get(ctx, "cluster", metav1.GetOptions{}) if err != nil { return err } - a.gatherConfig = &insightDataGatherConf.Spec.GatherConfig + i.gatherConfig = &insightDataGatherConf.Spec.GatherConfig return nil } // GatherConfig provides the complete gather config in a thread-safe way. -func (a *APIConfigController) GatherConfig() *v1alpha1.GatherConfig { - a.lock.Lock() - defer a.lock.Unlock() - return a.gatherConfig +func (i *insightsDataGatherController) GatherConfig() *v1alpha1.GatherConfig { + i.lock.Lock() + defer i.lock.Unlock() + return i.gatherConfig } // GatherDisabled tells whether data gathering is disabled or not -func (a *APIConfigController) GatherDisabled() bool { - a.lock.Lock() - defer a.lock.Unlock() +func (i *insightsDataGatherController) GatherDisabled() bool { + i.lock.Lock() + defer i.lock.Unlock() - if utils.StringInSlice("all", a.gatherConfig.DisabledGatherers) || - utils.StringInSlice("ALL", a.gatherConfig.DisabledGatherers) { + if utils.StringInSlice("all", i.gatherConfig.DisabledGatherers) || + utils.StringInSlice("ALL", i.gatherConfig.DisabledGatherers) { return true } return false } // GatherDataPolicy provides DataPolicy attribute value defined in the API -func (a *APIConfigController) GatherDataPolicy() *v1alpha1.DataPolicy { - a.lock.Lock() - defer a.lock.Unlock() - return &a.gatherConfig.DataPolicy +func (i *insightsDataGatherController) GatherDataPolicy() *v1alpha1.DataPolicy { + i.lock.Lock() + defer i.lock.Unlock() + return &i.gatherConfig.DataPolicy } diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index b22014716..8aba0942d 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -89,10 +89,6 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller if envVersion, exists := os.LookupEnv("RELEASE_VERSION"); exists { desiredVersion = envVersion } - apiConfigObserver, err := configobserver.NewAPIConfigObserver(gatherKubeConfig, controller.EventRecorder, configInformers) - if err != nil { - return err - } // By default, this will exit(0) the process if the featuregates ever change to a different set of values. featureGateAccessor := featuregates.NewFeatureGateAccess( @@ -124,10 +120,15 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller return fmt.Errorf("can't create --path: %v", err) } } - if insightsConfigAPIEnabled { - go apiConfigObserver.Run(ctx, 1) + apiConfigObserver, err := configobserver.NewInsightsDataGatherObserver(gatherKubeConfig, + controller.EventRecorder, configInformers, insightsConfigAPIEnabled) + if err != nil { + return err } + go apiConfigObserver.Run(ctx, 1) + go configInformers.Start(ctx.Done()) + // secretConfigObserver synthesizes all config into the status reporter controller secretConfigObserver := configobserver.New(s.Controller, kubeClient) go secretConfigObserver.Start(ctx) diff --git a/pkg/controller/periodic/periodic.go b/pkg/controller/periodic/periodic.go index 88fc1bf22..0b95c3a55 100644 --- a/pkg/controller/periodic/periodic.go +++ b/pkg/controller/periodic/periodic.go @@ -40,7 +40,7 @@ const ( // and flushes the recorder to create archives type Controller struct { secretConfigurator configobserver.Configurator - apiConfigurator configobserver.APIConfigObserver + apiConfigurator configobserver.InsightsDataGatherObserver recorder recorder.FlushInterface gatherers []gatherers.Interface statuses map[string]controllerstatus.StatusController @@ -56,7 +56,7 @@ func New( listGatherers []gatherers.Interface, anonymizer *anonymization.Anonymizer, insightsOperatorCLI operatorv1client.InsightsOperatorInterface, - apiConfigurator configobserver.APIConfigObserver, + apiConfigurator configobserver.InsightsDataGatherObserver, ) *Controller { statuses := make(map[string]controllerstatus.StatusController) diff --git a/pkg/controller/status/controller.go b/pkg/controller/status/controller.go index 566471634..3cf1b666e 100644 --- a/pkg/controller/status/controller.go +++ b/pkg/controller/status/controller.go @@ -56,7 +56,7 @@ type Controller struct { statusCh chan struct{} secretConfigurator configobserver.Configurator - apiConfigurator configobserver.APIConfigObserver + apiConfigurator configobserver.InsightsDataGatherObserver sources map[string]controllerstatus.StatusController reported Reported @@ -70,7 +70,7 @@ type Controller struct { // NewController creates a statusMessage controller, responsible for monitoring the operators statusMessage and updating its cluster statusMessage accordingly. func NewController(client configv1client.ConfigV1Interface, secretConfigurator configobserver.Configurator, - apiConfigurator configobserver.APIConfigObserver, + apiConfigurator configobserver.InsightsDataGatherObserver, namespace string) *Controller { c := &Controller{ name: "insights", diff --git a/pkg/insights/insightsuploader/insightsuploader.go b/pkg/insights/insightsuploader/insightsuploader.go index ba068e76a..6a372e2b3 100644 --- a/pkg/insights/insightsuploader/insightsuploader.go +++ b/pkg/insights/insightsuploader/insightsuploader.go @@ -36,7 +36,7 @@ type Controller struct { summarizer Summarizer client *insightsclient.Client secretConfigurator configobserver.Configurator - apiConfigurator configobserver.APIConfigObserver + apiConfigurator configobserver.InsightsDataGatherObserver reporter StatusReporter archiveUploaded chan struct{} initialDelay time.Duration @@ -45,7 +45,7 @@ type Controller struct { func New(summarizer Summarizer, client *insightsclient.Client, secretconfigurator configobserver.Configurator, - apiConfigurator configobserver.APIConfigObserver, + apiConfigurator configobserver.InsightsDataGatherObserver, statusReporter StatusReporter, initialDelay time.Duration) *Controller {