Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,57 @@ end

```crystal
Database::QueryEvent.subscribe do |event, duration|
puts Pulsar.elaspted_text(duration) # "2.3ms"
puts Pulsar.elapsed_text(duration) # "2.3ms"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops! 😂

end
```

This method can be used with any `Time::Span`.

## Asynchronous Subscribers

Pulsar now supports asynchronous event subscribers that automatically run in a separate fiber:

```crystal
# Regular synchronous subscriber
MyEvent.subscribe do |event|
# This runs synchronously and can block
end

# Asynchronous subscriber
MyEvent.subscribe_async do |event|
# This automatically runs in a new fiber
HTTP::Client.post("https://example.com/webhook", body: event.to_json)
end

# Also works with timed events
Database::QueryEvent.subscribe_async do |event, duration|
# Won't block other subscribers
send_metrics_to_monitoring_service(event.query, duration)
end
```

## Error Handling

Pulsar provides configurable error handling for subscribers:

```crystal
# Configure the error handling strategy
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Log # Default

# Available strategies:
# - Ignore: Silently ignore errors and continue
# - Log: Log errors and continue (default)
# - Raise: Stop processing and raise the error
# - Custom: Use a custom error handler

# Custom error handling
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom
Pulsar::ErrorHandler.custom_handler = ->(exception, event) {
MyErrorReporter.report(exception, context: {event: event.name})
nil
}
```

## Performance gotchas

Subscribers are notified synchronously in the same Fiber as the publisher.
Expand All @@ -162,7 +207,7 @@ will block anything else from running.

If you are doing some logging it is probably fine, but if you are doing
something more time-intensive or failure prone like making an HTTP request or
saving to the database you should pay special attention.
saving to the database you should use `subscribe_async` instead:

### Example of a problematic subscriber

Expand All @@ -176,30 +221,27 @@ MyEvent.publish
puts "I just took 5 seconds to print!"
```

Oops. To get around this you can spawn a new fiber:
### Solution: Use async subscribers

```crystal
MyEvent.subscribe do |event|
# Now the `sleep` will run in a new Fiber and will not block this one
spawn do
sleep(5)
end
MyEvent.subscribe_async do |event|
# This automatically runs in a new Fiber
sleep(5)
end

MyEvent.publish

puts "This will print right away!"
```

### Potential solutions
### Alternative solutions

As described above you could run long running code in a new Fiber with `spawn`.
You could also use a background job library like https://github.com/robacarp/mosquito.

Be aware that running things in a Fiber will lose the current Fiber's context. This is
important for logging since `Log.context` only works for the current Fiber.
So if you plan to log using the built-in Logger, you likely _do not_ want to
spawn a new fiber. It is fast enough to just log like normal.
use async subscribers for logging. It is fast enough to just log synchronously.

## Contributing

Expand Down
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ license: MIT
development_dependencies:
ameba:
github: crystal-ameba/ameba
version: ~> 1.0.0
version: ~> 1.6.0
110 changes: 110 additions & 0 deletions spec/async_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require "./spec_helper"

class Pulsar::AsyncTestEvent < Pulsar::Event
end

class Pulsar::AsyncTestTimedEvent < Pulsar::TimedEvent
end

describe "Pulsar async subscribers" do
after_each do
Pulsar::AsyncTestEvent.clear_subscribers
Pulsar::AsyncTestTimedEvent.clear_subscribers
end

describe "Event.subscribe_async" do
it "runs subscribers asynchronously" do
channel = Channel(Int32).new

Pulsar::AsyncTestEvent.subscribe_async do |_event|
sleep 10.milliseconds
channel.send(1)
end

Pulsar::AsyncTestEvent.subscribe_async do |_event|
sleep 10.milliseconds
channel.send(2)
end

start_time = Time.monotonic
Pulsar::AsyncTestEvent.publish
publish_time = Time.monotonic - start_time

# Publish should return immediately without waiting
publish_time.should be < 0.005.seconds

# Both async subscribers should complete
results = [channel.receive, channel.receive].sort
results.should eq([1, 2])
end

it "doesn't block synchronous subscribers" do
sync_called = false
async_channel = Channel(Nil).new

Pulsar::AsyncTestEvent.subscribe do |_event|
sync_called = true
end

Pulsar::AsyncTestEvent.subscribe_async do |_event|
sleep 10.milliseconds
async_channel.send(nil)
end

Pulsar::AsyncTestEvent.publish

# Sync subscriber should have been called immediately
sync_called.should be_true

# Async subscriber should complete later
async_channel.receive
end
end

describe "TimedEvent.subscribe_async" do
it "runs timed subscribers asynchronously" do
channel = Channel(Float64).new

Pulsar::AsyncTestTimedEvent.subscribe_async do |_event, duration|
sleep 10.milliseconds
channel.send(duration.total_milliseconds)
end

start_time = Time.monotonic
Pulsar::AsyncTestTimedEvent.publish do
sleep 2.milliseconds
end
publish_time = Time.monotonic - start_time

# Should only wait for the block inside publish, not the async subscriber
publish_time.should be < 0.01.seconds

# Async subscriber should receive the duration
duration_ms = channel.receive
duration_ms.should be > 2.0
end
end

describe "error handling with async subscribers" do
it "handles errors in async subscribers based on strategy" do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom

errors_channel = Channel(String).new

Pulsar::ErrorHandler.custom_handler = ->(exception : Exception, event : Pulsar::BaseEvent) {
errors_channel.send("#{event.name}: #{exception.message}")
nil
}

Pulsar::AsyncTestEvent.subscribe_async do |_event|
raise "Async error"
end

Pulsar::AsyncTestEvent.publish

# Error should be handled in the spawned fiber
error_msg = errors_channel.receive
error_msg.should eq("Pulsar::AsyncTestEvent: Async error")
end
end
end
84 changes: 84 additions & 0 deletions spec/error_handling_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
require "./spec_helper"

class Pulsar::ErrorTestEvent < Pulsar::Event
end

class Pulsar::ErrorTestTimedEvent < Pulsar::TimedEvent
end

describe "Pulsar error handling" do
after_each do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Log
Pulsar::ErrorTestEvent.clear_subscribers
Pulsar::ErrorTestTimedEvent.clear_subscribers
end

describe "with ignore strategy" do
it "ignores errors and continues to next subscriber" do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Ignore

calls = [] of Int32

Pulsar::ErrorTestEvent.subscribe { calls << 1 }
Pulsar::ErrorTestEvent.subscribe { raise "Test error" }
Pulsar::ErrorTestEvent.subscribe { calls << 3 }

Pulsar::ErrorTestEvent.publish

calls.should eq([1, 3])
end
end

describe "with raise strategy" do
it "raises the error and stops processing" do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Raise

calls = [] of Int32

Pulsar::ErrorTestEvent.subscribe { calls << 1 }
Pulsar::ErrorTestEvent.subscribe { raise "Test error" }
Pulsar::ErrorTestEvent.subscribe { calls << 3 }

expect_raises(Exception, "Test error") do
Pulsar::ErrorTestEvent.publish
end

calls.should eq([1])
end
end

describe "with custom strategy" do
it "calls the custom handler" do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Custom

handled_errors = [] of String

Pulsar::ErrorHandler.custom_handler = ->(exception : Exception, event : Pulsar::BaseEvent) {
handled_errors << "#{event.name}: #{exception.message}"
nil
}

Pulsar::ErrorTestEvent.subscribe { raise "Custom error" }
Pulsar::ErrorTestEvent.publish

handled_errors.should eq(["Pulsar::ErrorTestEvent: Custom error"])
end
end

describe "with timed events" do
it "handles errors in timed event subscribers" do
Pulsar::ErrorHandler.strategy = Pulsar::ErrorHandler::Strategy::Ignore

calls = [] of Int32

Pulsar::ErrorTestTimedEvent.subscribe { |_, _| calls << 1 }
Pulsar::ErrorTestTimedEvent.subscribe { |_, _| raise "Timed error" }
Pulsar::ErrorTestTimedEvent.subscribe { |_, _| calls << 3 }

result = Pulsar::ErrorTestTimedEvent.publish { :success }

calls.should eq([1, 3])
result.should eq(:success)
end
end
end
1 change: 1 addition & 0 deletions src/pulsar.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./pulsar/*"
require "log"

module Pulsar
VERSION = "0.2.3"
Expand Down
32 changes: 32 additions & 0 deletions src/pulsar/error_handler.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module Pulsar
# Configuration for handling errors in event subscribers
class ErrorHandler
enum Strategy
# Ignore the error and continue to next subscriber
Ignore
# Log the error and continue to next subscriber (default)
Log
# Stop processing and raise the error
Raise
# Call a custom handler
Custom
end

class_property strategy : Strategy = Strategy::Log
class_property custom_handler : (Exception, Pulsar::BaseEvent) -> Nil = ->(exception : Exception, event : Pulsar::BaseEvent) { }

# Handle an error from a subscriber
def self.handle(exception : Exception, event : Pulsar::BaseEvent)
case strategy
when .ignore?
# Do nothing
when .log?
Log.error(exception: exception) { "Error in subscriber for #{event.name}" }
when .raise?
raise exception
when .custom?
custom_handler.call(exception, event)
end
end
end
end
31 changes: 29 additions & 2 deletions src/pulsar/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,29 @@ abstract class Pulsar::Event < Pulsar::BaseEvent
self.subscribers << block
end

# Subscribe to events with async execution
#
# The subscriber will be executed in a new fiber, preventing it from
# blocking other subscribers or the main execution flow.
#
# ```
# MyEvent.subscribe_async do |event|
# # This runs in a separate fiber
# HTTP::Client.post("https://example.com/webhook", body: event.to_json)
# end
# ```
def self.subscribe_async(&block : self -> Nil)
subscribe do |event|
spawn do
begin
block.call(event)
rescue exception
Pulsar::ErrorHandler.handle(exception, event)
end
end
end
end

# Publishes the event to all subscribers.
#
# ```
Expand Down Expand Up @@ -54,8 +77,12 @@ abstract class Pulsar::Event < Pulsar::BaseEvent
protected def publish
Pulsar.maybe_log_event(self)

self.class.subscribers.each do |s|
s.call(self)
self.class.subscribers.each do |subscriber|
begin
subscriber.call(self)
rescue exception
Pulsar::ErrorHandler.handle(exception, self)
end
end
end
end
Loading