From 602e7b741dcb1000671ed0e098c1c8c5d071fa30 Mon Sep 17 00:00:00 2001 From: Remy Marronnier Date: Fri, 20 Jun 2025 12:50:38 +0200 Subject: [PATCH 1/3] async and error handling --- README.md | 64 +++++++++++++++++---- spec/async_spec.cr | 110 ++++++++++++++++++++++++++++++++++++ spec/error_handling_spec.cr | 84 +++++++++++++++++++++++++++ src/pulsar.cr | 1 + src/pulsar/error_handler.cr | 32 +++++++++++ src/pulsar/event.cr | 29 +++++++++- src/pulsar/timed_event.cr | 36 +++++++++++- 7 files changed, 341 insertions(+), 15 deletions(-) create mode 100644 spec/async_spec.cr create mode 100644 spec/error_handling_spec.cr create mode 100644 src/pulsar/error_handler.cr diff --git a/README.md b/README.md index f30b935..6fa31e9 100644 --- a/README.md +++ b/README.md @@ -148,12 +148,57 @@ end ```crystal Database::QueryEvent.subscribe do |event, duration| - puts Pulsar.elaspted_text(duration) # "2.3ms" + puts Pulsar.elapsed_text(duration) # "2.3ms" end ``` This method can be used with any `Time::Span`. +## Asynchronous Subscribers + +Pulsar now supports asynchronous event subscribers that automatically run in a separate fiber: + +```crystal +# Regular synchronous subscriber +MyEvent.subscribe do |event| + # This runs synchronously and can block +end + +# Asynchronous subscriber +MyEvent.subscribe_async do |event| + # This automatically runs in a new fiber + HTTP::Client.post("https://example.com/webhook", body: event.to_json) +end + +# Also works with timed events +Database::QueryEvent.subscribe_async do |event, duration| + # Won't block other subscribers + send_metrics_to_monitoring_service(event.query, duration) +end +``` + +## Error Handling + +Pulsar provides configurable error handling for subscribers: + +```crystal +# Configure the error handling strategy +Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Log # Default + +# Available strategies: +# - Ignore: Silently ignore errors and continue +# - Log: Log errors and continue (default) +# - Raise: Stop processing and raise the error +# - Custom: Use a custom error handler + +# Custom error handling +Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom +Pulsar::ErrorHandler.custom_handler = ->(exception, event) { + MyErrorReporter.report(exception, context: {event: event.name}) + nil +} +``` + ## Performance gotchas Subscribers are notified synchronously in the same Fiber as the publisher. @@ -162,7 +207,7 @@ will block anything else from running. If you are doing some logging it is probably fine, but if you are doing something more time-intensive or failure prone like making an HTTP request or -saving to the database you should pay special attention. +saving to the database you should use `subscribe_async` instead: ### Example of a problematic subscriber @@ -176,14 +221,12 @@ MyEvent.publish puts "I just took 5 seconds to print!" ``` -Oops. To get around this you can spawn a new fiber: +### Solution: Use async subscribers ```crystal -MyEvent.subscribe do |event| - # Now the `sleep` will run in a new Fiber and will not block this one - spawn do - sleep(5) - end +MyEvent.subscribe_async do |event| + # This automatically runs in a new Fiber + sleep(5) end MyEvent.publish @@ -191,15 +234,14 @@ MyEvent.publish puts "This will print right away!" ``` -### Potential solutions +### Alternative solutions -As described above you could run long running code in a new Fiber with `spawn`. You could also use a background job library like https://github.com/robacarp/mosquito. Be aware that running things in a Fiber will lose the current Fiber's context. This is important for logging since `Log.context` only works for the current Fiber. So if you plan to log using the built-in Logger, you likely _do not_ want to -spawn a new fiber. It is fast enough to just log like normal. +use async subscribers for logging. It is fast enough to just log synchronously. ## Contributing diff --git a/spec/async_spec.cr b/spec/async_spec.cr new file mode 100644 index 0000000..6832a1c --- /dev/null +++ b/spec/async_spec.cr @@ -0,0 +1,110 @@ +require "./spec_helper" + +class Pulsar::AsyncTestEvent < Pulsar::Event +end + +class Pulsar::AsyncTestTimedEvent < Pulsar::TimedEvent +end + +describe "Pulsar async subscribers" do + after_each do + Pulsar::AsyncTestEvent.clear_subscribers + Pulsar::AsyncTestTimedEvent.clear_subscribers + end + + describe "Event.subscribe_async" do + it "runs subscribers asynchronously" do + channel = Channel(Int32).new + + Pulsar::AsyncTestEvent.subscribe_async do |event| + sleep 10.milliseconds + channel.send(1) + end + + Pulsar::AsyncTestEvent.subscribe_async do |event| + sleep 10.milliseconds + channel.send(2) + end + + start_time = Time.monotonic + Pulsar::AsyncTestEvent.publish + publish_time = Time.monotonic - start_time + + # Publish should return immediately without waiting + publish_time.should be < 0.005.seconds + + # Both async subscribers should complete + results = [channel.receive, channel.receive].sort + results.should eq([1, 2]) + end + + it "doesn't block synchronous subscribers" do + sync_called = false + async_channel = Channel(Nil).new + + Pulsar::AsyncTestEvent.subscribe do |event| + sync_called = true + end + + Pulsar::AsyncTestEvent.subscribe_async do |event| + sleep 10.milliseconds + async_channel.send(nil) + end + + Pulsar::AsyncTestEvent.publish + + # Sync subscriber should have been called immediately + sync_called.should be_true + + # Async subscriber should complete later + async_channel.receive + end + end + + describe "TimedEvent.subscribe_async" do + it "runs timed subscribers asynchronously" do + channel = Channel(Float64).new + + Pulsar::AsyncTestTimedEvent.subscribe_async do |event, duration| + sleep 10.milliseconds + channel.send(duration.total_milliseconds) + end + + start_time = Time.monotonic + Pulsar::AsyncTestTimedEvent.publish do + sleep 2.milliseconds + end + publish_time = Time.monotonic - start_time + + # Should only wait for the block inside publish, not the async subscriber + publish_time.should be < 0.01.seconds + + # Async subscriber should receive the duration + duration_ms = channel.receive + duration_ms.should be > 2.0 + end + end + + describe "error handling with async subscribers" do + it "handles errors in async subscribers based on strategy" do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom + + errors_channel = Channel(String).new + + Pulsar::ErrorHandler.custom_handler = ->(exception : Exception, event : Pulsar::BaseEvent) { + errors_channel.send("#{event.name}: #{exception.message}") + nil + } + + Pulsar::AsyncTestEvent.subscribe_async do |event| + raise "Async error" + end + + Pulsar::AsyncTestEvent.publish + + # Error should be handled in the spawned fiber + error_msg = errors_channel.receive + error_msg.should eq("Pulsar::AsyncTestEvent: Async error") + end + end +end diff --git a/spec/error_handling_spec.cr b/spec/error_handling_spec.cr new file mode 100644 index 0000000..2b19191 --- /dev/null +++ b/spec/error_handling_spec.cr @@ -0,0 +1,84 @@ +require "./spec_helper" + +class Pulsar::ErrorTestEvent < Pulsar::Event +end + +class Pulsar::ErrorTestTimedEvent < Pulsar::TimedEvent +end + +describe "Pulsar error handling" do + after_each do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Log + Pulsar::ErrorTestEvent.clear_subscribers + Pulsar::ErrorTestTimedEvent.clear_subscribers + end + + describe "with ignore strategy" do + it "ignores errors and continues to next subscriber" do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Ignore + + calls = [] of Int32 + + Pulsar::ErrorTestEvent.subscribe { calls << 1 } + Pulsar::ErrorTestEvent.subscribe { raise "Test error" } + Pulsar::ErrorTestEvent.subscribe { calls << 3 } + + Pulsar::ErrorTestEvent.publish + + calls.should eq([1, 3]) + end + end + + describe "with raise strategy" do + it "raises the error and stops processing" do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Raise + + calls = [] of Int32 + + Pulsar::ErrorTestEvent.subscribe { calls << 1 } + Pulsar::ErrorTestEvent.subscribe { raise "Test error" } + Pulsar::ErrorTestEvent.subscribe { calls << 3 } + + expect_raises(Exception, "Test error") do + Pulsar::ErrorTestEvent.publish + end + + calls.should eq([1]) + end + end + + describe "with custom strategy" do + it "calls the custom handler" do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom + + handled_errors = [] of String + + Pulsar::ErrorHandler.custom_handler = ->(exception : Exception, event : Pulsar::BaseEvent) { + handled_errors << "#{event.name}: #{exception.message}" + nil + } + + Pulsar::ErrorTestEvent.subscribe { raise "Custom error" } + Pulsar::ErrorTestEvent.publish + + handled_errors.should eq(["Pulsar::ErrorTestEvent: Custom error"]) + end + end + + describe "with timed events" do + it "handles errors in timed event subscribers" do + Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Ignore + + calls = [] of Int32 + + Pulsar::ErrorTestTimedEvent.subscribe { |_, _| calls << 1 } + Pulsar::ErrorTestTimedEvent.subscribe { |_, _| raise "Timed error" } + Pulsar::ErrorTestTimedEvent.subscribe { |_, _| calls << 3 } + + result = Pulsar::ErrorTestTimedEvent.publish { :success } + + calls.should eq([1, 3]) + result.should eq(:success) + end + end +end diff --git a/src/pulsar.cr b/src/pulsar.cr index 46c14eb..e113687 100644 --- a/src/pulsar.cr +++ b/src/pulsar.cr @@ -1,4 +1,5 @@ require "./pulsar/*" +require "log" module Pulsar VERSION = "0.2.3" diff --git a/src/pulsar/error_handler.cr b/src/pulsar/error_handler.cr new file mode 100644 index 0000000..163b159 --- /dev/null +++ b/src/pulsar/error_handler.cr @@ -0,0 +1,32 @@ +module Pulsar + # Configuration for handling errors in event subscribers + class ErrorHandler + enum Strategy + # Ignore the error and continue to next subscriber + Ignore + # Log the error and continue to next subscriber (default) + Log + # Stop processing and raise the error + Raise + # Call a custom handler + Custom + end + + class_property strategy : Strategy = Strategy::Log + class_property custom_handler : (Exception, Pulsar::BaseEvent) -> Nil = ->(exception : Exception, event : Pulsar::BaseEvent) { } + + # Handle an error from a subscriber + def self.handle(exception : Exception, event : Pulsar::BaseEvent) + case strategy + when .ignore? + # Do nothing + when .log? + Log.error(exception: exception) { "Error in subscriber for #{event.name}" } + when .raise? + raise exception + when .custom? + custom_handler.call(exception, event) + end + end + end +end diff --git a/src/pulsar/event.cr b/src/pulsar/event.cr index ad19725..2fcc748 100644 --- a/src/pulsar/event.cr +++ b/src/pulsar/event.cr @@ -20,6 +20,29 @@ abstract class Pulsar::Event < Pulsar::BaseEvent self.subscribers << block end + # Subscribe to events with async execution + # + # The subscriber will be executed in a new fiber, preventing it from + # blocking other subscribers or the main execution flow. + # + # ``` + # MyEvent.subscribe_async do |event| + # # This runs in a separate fiber + # HTTP::Client.post("https://example.com/webhook", body: event.to_json) + # end + # ``` + def self.subscribe_async(&block : self -> Nil) + subscribe do |event| + spawn do + begin + block.call(event) + rescue exception + Pulsar::ErrorHandler.handle(exception, event) + end + end + end + end + # Publishes the event to all subscribers. # # ``` @@ -55,7 +78,11 @@ abstract class Pulsar::Event < Pulsar::BaseEvent Pulsar.maybe_log_event(self) self.class.subscribers.each do |s| - s.call(self) + begin + s.call(self) + rescue exception + Pulsar::ErrorHandler.handle(exception, self) + end end end end diff --git a/src/pulsar/timed_event.cr b/src/pulsar/timed_event.cr index c951c0a..c28bdfb 100644 --- a/src/pulsar/timed_event.cr +++ b/src/pulsar/timed_event.cr @@ -22,6 +22,32 @@ abstract class Pulsar::TimedEvent < Pulsar::BaseEvent self.subscribers << block end + # Subscribe to events with async execution + # + # The subscriber will be executed in a new fiber, preventing it from + # blocking other subscribers or the main execution flow. + # + # ``` + # MyEvent.subscribe_async do |event, duration| + # # This runs in a separate fiber + # HTTP::Client.post("https://example.com/metrics", body: { + # event: event.name, + # duration_ms: duration.total_milliseconds, + # }.to_json) + # end + # ``` + def self.subscribe_async(&block : self, Time::Span -> Nil) + subscribe do |event, duration| + spawn do + begin + block.call(event, duration) + rescue exception + Pulsar::ErrorHandler.handle(exception, event) + end + end + end + end + # Publishes the event when the block finishes running. # # Similar to `Pulsar::Event#publish` but measures and publishes the time @@ -57,21 +83,25 @@ abstract class Pulsar::TimedEvent < Pulsar::BaseEvent # # ...run some code # end # ``` - def self.publish(*args_, **named_args_) + def self.publish(*args_, **named_args_, &) # Name it args_ so if the initializer has an `args` argument `publish` will still work new(*args_, **named_args_).publish do yield end end - protected def publish + protected def publish(&) Pulsar.maybe_log_event(self) start = Time.monotonic result = yield duration = Time.monotonic - start self.class.subscribers.each do |s| - s.call(self, duration) + begin + s.call(self, duration) + rescue exception + Pulsar::ErrorHandler.handle(exception, self) + end end result From 2d6faffd20533f4874c638fcad8f86d42766fd32 Mon Sep 17 00:00:00 2001 From: Remy Marronnier Date: Sun, 22 Jun 2025 23:42:20 +0200 Subject: [PATCH 2/3] tentative fix for ci --- shard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shard.yml b/shard.yml index 5008033..a6191ff 100644 --- a/shard.yml +++ b/shard.yml @@ -11,4 +11,4 @@ license: MIT development_dependencies: ameba: github: crystal-ameba/ameba - version: ~> 1.0.0 + version: ~> 1.6.0 From 2e7da5531462cdd0ae5157d15270732f86341232 Mon Sep 17 00:00:00 2001 From: Remy Marronnier Date: Mon, 23 Jun 2025 02:17:13 +0200 Subject: [PATCH 3/3] ameba fixes --- spec/async_spec.cr | 12 ++++++------ src/pulsar/event.cr | 4 ++-- src/pulsar/timed_event.cr | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/spec/async_spec.cr b/spec/async_spec.cr index 6832a1c..5e48484 100644 --- a/spec/async_spec.cr +++ b/spec/async_spec.cr @@ -16,12 +16,12 @@ describe "Pulsar async subscribers" do it "runs subscribers asynchronously" do channel = Channel(Int32).new - Pulsar::AsyncTestEvent.subscribe_async do |event| + Pulsar::AsyncTestEvent.subscribe_async do |_event| sleep 10.milliseconds channel.send(1) end - Pulsar::AsyncTestEvent.subscribe_async do |event| + Pulsar::AsyncTestEvent.subscribe_async do |_event| sleep 10.milliseconds channel.send(2) end @@ -42,11 +42,11 @@ describe "Pulsar async subscribers" do sync_called = false async_channel = Channel(Nil).new - Pulsar::AsyncTestEvent.subscribe do |event| + Pulsar::AsyncTestEvent.subscribe do |_event| sync_called = true end - Pulsar::AsyncTestEvent.subscribe_async do |event| + Pulsar::AsyncTestEvent.subscribe_async do |_event| sleep 10.milliseconds async_channel.send(nil) end @@ -65,7 +65,7 @@ describe "Pulsar async subscribers" do it "runs timed subscribers asynchronously" do channel = Channel(Float64).new - Pulsar::AsyncTestTimedEvent.subscribe_async do |event, duration| + Pulsar::AsyncTestTimedEvent.subscribe_async do |_event, duration| sleep 10.milliseconds channel.send(duration.total_milliseconds) end @@ -96,7 +96,7 @@ describe "Pulsar async subscribers" do nil } - Pulsar::AsyncTestEvent.subscribe_async do |event| + Pulsar::AsyncTestEvent.subscribe_async do |_event| raise "Async error" end diff --git a/src/pulsar/event.cr b/src/pulsar/event.cr index 2fcc748..7c4cedf 100644 --- a/src/pulsar/event.cr +++ b/src/pulsar/event.cr @@ -77,9 +77,9 @@ abstract class Pulsar::Event < Pulsar::BaseEvent protected def publish Pulsar.maybe_log_event(self) - self.class.subscribers.each do |s| + self.class.subscribers.each do |subscriber| begin - s.call(self) + subscriber.call(self) rescue exception Pulsar::ErrorHandler.handle(exception, self) end diff --git a/src/pulsar/timed_event.cr b/src/pulsar/timed_event.cr index c28bdfb..f023ce7 100644 --- a/src/pulsar/timed_event.cr +++ b/src/pulsar/timed_event.cr @@ -96,9 +96,9 @@ abstract class Pulsar::TimedEvent < Pulsar::BaseEvent result = yield duration = Time.monotonic - start - self.class.subscribers.each do |s| + self.class.subscribers.each do |subscriber| begin - s.call(self, duration) + subscriber.call(self, duration) rescue exception Pulsar::ErrorHandler.handle(exception, self) end