From a6d1a48d0235656d1c2cd9a140f4a022105b7d96 Mon Sep 17 00:00:00 2001 From: goshado Date: Thu, 9 Jan 2025 21:21:26 +0200 Subject: [PATCH 1/5] fix: event handler listen to ctx.Done --- pkg/event_handler/main.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/event_handler/main.go b/pkg/event_handler/main.go index e819677..b620b0b 100644 --- a/pkg/event_handler/main.go +++ b/pkg/event_handler/main.go @@ -27,14 +27,24 @@ func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, Notifier: notifier, } go func() { - for event := range watcher.ResultChan() { - err2 := handler.Handle(ctx, &event) - if err2 != nil { - log.Printf("[event handler] failed to Handle workflow event %s", err2) // ERROR + for { + select { + case <-ctx.Done(): + log.Print("[event handler] context canceled, stopping watcher") + watcher.Stop() + return + case event, ok := <-watcher.ResultChan(): + if !ok { + log.Print("[event handler] result channel closed") + watcher.Stop() + return + } + if err2 := handler.Handle(ctx, &event); err2 != nil { + log.Printf("[event handler] failed to Handle workflow event: %v", err2) + } } + stop() } - log.Print("[event handler] stopped work, closing watcher") - watcher.Stop() - stop() + }() } From d869b7c004cc9d710cfc0d69bbb36edf477c121c Mon Sep 17 00:00:00 2001 From: goshado Date: Thu, 9 Jan 2025 21:25:27 +0200 Subject: [PATCH 2/5] fix: event handler listen to ctx.Done --- cmd/piper/piper.go | 2 +- pkg/event_handler/main.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/piper/piper.go b/cmd/piper/piper.go index 30412a0..d9f378e 100644 --- a/cmd/piper/piper.go +++ b/cmd/piper/piper.go @@ -51,6 +51,6 @@ func main() { // Create context that listens for the interrupt signal from the OS. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - event_handler.Start(ctx, stop, cfg, globalClients) + event_handler.Start(ctx, cfg, globalClients) server.Start(ctx, stop, cfg, globalClients) } diff --git a/pkg/event_handler/main.go b/pkg/event_handler/main.go index b620b0b..374604d 100644 --- a/pkg/event_handler/main.go +++ b/pkg/event_handler/main.go @@ -8,7 +8,7 @@ import ( "log" ) -func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, clients *clients.Clients) { +func Start(ctx context.Context, cfg *conf.GlobalConfig, clients *clients.Clients) { labelSelector := &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ {Key: "piper.quickube.com/notified", @@ -43,7 +43,6 @@ func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, log.Printf("[event handler] failed to Handle workflow event: %v", err2) } } - stop() } }() From 399a7e62b826276de11a5f4a514cda62c7db159c Mon Sep 17 00:00:00 2001 From: goshado Date: Sat, 11 Jan 2025 18:26:03 +0200 Subject: [PATCH 3/5] fix: close piper if watcher down --- cmd/piper/piper.go | 2 +- pkg/event_handler/main.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/piper/piper.go b/cmd/piper/piper.go index d9f378e..30412a0 100644 --- a/cmd/piper/piper.go +++ b/cmd/piper/piper.go @@ -51,6 +51,6 @@ func main() { // Create context that listens for the interrupt signal from the OS. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - event_handler.Start(ctx, cfg, globalClients) + event_handler.Start(ctx, stop, cfg, globalClients) server.Start(ctx, stop, cfg, globalClients) } diff --git a/pkg/event_handler/main.go b/pkg/event_handler/main.go index 374604d..1fa94b4 100644 --- a/pkg/event_handler/main.go +++ b/pkg/event_handler/main.go @@ -8,7 +8,7 @@ import ( "log" ) -func Start(ctx context.Context, cfg *conf.GlobalConfig, clients *clients.Clients) { +func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, clients *clients.Clients) { labelSelector := &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ {Key: "piper.quickube.com/notified", @@ -37,6 +37,7 @@ func Start(ctx context.Context, cfg *conf.GlobalConfig, clients *clients.Clients if !ok { log.Print("[event handler] result channel closed") watcher.Stop() + stop() return } if err2 := handler.Handle(ctx, &event); err2 != nil { From aa2b3d69052ebd3d4246738aae68e925c6e9abfd Mon Sep 17 00:00:00 2001 From: goshado Date: Sat, 11 Jan 2025 20:12:50 +0200 Subject: [PATCH 4/5] fix: reduntant use of watcher.stop --- pkg/event_handler/main.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/event_handler/main.go b/pkg/event_handler/main.go index 1fa94b4..51a4e4b 100644 --- a/pkg/event_handler/main.go +++ b/pkg/event_handler/main.go @@ -27,24 +27,25 @@ func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, Notifier: notifier, } go func() { + defer func() { + log.Print("[event handler] shutting down, stopping watcher") + watcher.Stop() + }() + for { select { case <-ctx.Done(): - log.Print("[event handler] context canceled, stopping watcher") - watcher.Stop() + log.Print("[event handler] context canceled, exiting") return case event, ok := <-watcher.ResultChan(): if !ok { log.Print("[event handler] result channel closed") - watcher.Stop() - stop() return } if err2 := handler.Handle(ctx, &event); err2 != nil { - log.Printf("[event handler] failed to Handle workflow event: %v", err2) + log.Printf("[event handler] failed to handle workflow event: %v", err2) } } } - }() } From 219a5304932809f010a3b6f022c38ce9619c45f4 Mon Sep 17 00:00:00 2001 From: goshado Date: Mon, 13 Jan 2025 07:57:13 +0200 Subject: [PATCH 5/5] fix: update --- pkg/event_handler/main.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/event_handler/main.go b/pkg/event_handler/main.go index 51a4e4b..329a04d 100644 --- a/pkg/event_handler/main.go +++ b/pkg/event_handler/main.go @@ -27,19 +27,17 @@ func Start(ctx context.Context, stop context.CancelFunc, cfg *conf.GlobalConfig, Notifier: notifier, } go func() { - defer func() { - log.Print("[event handler] shutting down, stopping watcher") - watcher.Stop() - }() for { select { case <-ctx.Done(): log.Print("[event handler] context canceled, exiting") + watcher.Stop() return case event, ok := <-watcher.ResultChan(): if !ok { log.Print("[event handler] result channel closed") + stop() return } if err2 := handler.Handle(ctx, &event); err2 != nil {