Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
release:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch current tag as annotated. See https://github.com/actions/checkout/issues/290
- uses: ruby/setup-ruby@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v1
uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0.0
Expand All @@ -24,4 +24,4 @@ jobs:
with:
rubocop_version: gemfile
rubocop_extensions: rubocop-rails:gemfile rubocop-rspec:gemfile
reporter: github-pr-review # Default is github-pr-check
reporter: github-pr-review # Default is github-pr-check
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ jobs:
strategy:
fail-fast: false
matrix:
rails: ["6.1", "7.0", "7.1", "7.2"]
ruby: ["3.1", "3.2", "3.3"]
rails: ["7.0", "7.1", "7.2", "8.0", "8.1"]
ruby: ["3.3", "3.4"]
container:
image: ruby:${{ matrix.ruby }}
env:
CI: true
BUNDLE_GEMFILE: gemfiles/rails_${{ matrix.rails }}.gemfile
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
- uses: actions/checkout@v4
- uses: actions/cache@v4
with:
path: vendor/bundle
key: bundle-${{ matrix.ruby }}-${{ hashFiles('**/*.gemspec') }}-${{ hashFiles(format('**/gemfiles/rails_{0}.gemfile', matrix.rails)) }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Unreleased
- Add support for newer ActiveJob features such as scheduled jobs and bulk enqueued jobs. https://github.com/Fullscript/yabeda-activejob/pull/24 @lewispb

## 0.6.0 - 2024-10-29
- Add support for Rails 7.1.4+
- Fix rubocop rules and configure Github actions to run the specs for supported Rails / Ruby versions.
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ Sidekiq.configure_server do |_config|
end
```

If using with resque:
If using with solid_queue or resque:
```ruby
# config/initializers/yabeda.rb or elsewhere
Yabeda::ActiveJob.install!
```
If using resque you may need to use [yabeda-prometheus-mmap](https://github.com/yabeda-rb/yabeda-prometheus-mmap) or set your storage type to direct file store so that the metrics are available
to your collector.
If using solid_queue or resque you need to use [yabeda-prometheus-mmap](https://github.com/yabeda-rb/yabeda-prometheus-mmap) or set your storage type to direct file store so that the metrics are available to your collector.

To set your storage type to direct file store you can do the following in your yabeda initializer:

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_6.1.gemfile → gemfiles/rails_8.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
source "https://rubygems.org"
git_source(:github) { |repo| "https://github.com/#{repo}.git" }

gem "rails", "~> 6.1.0"
gem "rails", "~> 8.0.0"

eval_gemfile "../Gemfile"
8 changes: 8 additions & 0 deletions gemfiles/rails_8.1.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

source "https://rubygems.org"
git_source(:github) { |repo| "https://github.com/#{repo}.git" }

gem "rails", "~> 8.1.0"

eval_gemfile "../Gemfile"
151 changes: 78 additions & 73 deletions lib/yabeda/activejob/event_handler.rb
Original file line number Diff line number Diff line change
@@ -1,105 +1,110 @@
# frozen_string_literal: true

class Yabeda::ActiveJob::EventHandler
def initialize(*event_args)
@event = ActiveSupport::Notifications::Event.new(*event_args)
end
module Yabeda
module ActiveJob
class EventHandler
def initialize(*event_args)
@event = ActiveSupport::Notifications::Event.new(*event_args)
end

def handle_perform
labels = {
activejob: event.payload[:job].class.to_s,
queue: event.payload[:job].queue_name.to_s,
executions: event.payload[:job].executions.to_s,
}
if event.payload[:exception].present?
Yabeda.activejob_failed_total.increment(
labels.merge(failure_reason: event.payload[:exception].first.to_s),
)
else
Yabeda.activejob_success_total.increment(labels)
end
def handle_perform
labels = {
activejob: event.payload[:job].class.to_s,
queue: event.payload[:job].queue_name.to_s,
executions: event.payload[:job].executions.to_s,
}
if event.payload[:exception].present?
Yabeda.activejob_failed_total.increment(
labels.merge(failure_reason: event.payload[:exception].first.to_s),
)
else
Yabeda.activejob_success_total.increment(labels)
end

Yabeda.activejob_executed_total.increment(labels)
Yabeda.activejob_runtime.measure(labels, ms2s(event.duration))
call_after_event_block
end
Yabeda.activejob_executed_total.increment(labels)
Yabeda.activejob_runtime.measure(labels, ms2s(event.duration))
call_after_event_block
end

def handle_perform_start
labels = common_labels(event.payload[:job])
def handle_perform_start
labels = common_labels(event.payload[:job])

job_latency_value = job_latency
Yabeda.activejob_latency.measure(labels, job_latency_value) if job_latency_value.present?
call_after_event_block
end
job_latency_value = job_latency
Yabeda.activejob_latency.measure(labels, job_latency_value) if job_latency_value.present?
call_after_event_block
end

def handle_enqueue
labels = common_labels(event.payload[:job])
def handle_enqueue
labels = common_labels(event.payload[:job])

Yabeda.activejob_enqueued_total.increment(labels)
Yabeda.activejob_enqueued_total.increment(labels)

call_after_event_block
end
call_after_event_block
end

def handle_enqueue_at
labels = common_labels(event.payload[:job])
def handle_enqueue_at
labels = common_labels(event.payload[:job])

Yabeda.activejob_scheduled_total.increment(labels)
Yabeda.activejob_scheduled_total.increment(labels)

call_after_event_block
end
call_after_event_block
end

def handle_enqueue_all
event.payload[:jobs].each do |job|
labels = common_labels(job)
def handle_enqueue_all
event.payload[:jobs].each do |job|
labels = common_labels(job)

if job.scheduled_at
Yabeda.activejob_scheduled_total.increment(labels)
else
Yabeda.activejob_enqueued_total.increment(labels)
if job.scheduled_at
Yabeda.activejob_scheduled_total.increment(labels)
else
Yabeda.activejob_enqueued_total.increment(labels)
end
end

call_after_event_block
end
end

call_after_event_block
end
private

private
attr_reader :event
attr_reader :event

def job_latency
return unless (enqueue_time = event.payload[:job].enqueued_at)

def job_latency
if enqueue_time = event.payload[:job].enqueued_at
enqueue_time = parse_event_time(enqueue_time)
perform_at_time = parse_event_time(event.end)

perform_at_time - enqueue_time
end
end

def ms2s(milliseconds)
(milliseconds.to_f / 1000).round(3)
end
def ms2s(milliseconds)
(milliseconds.to_f / 1000).round(3)
end

def parse_event_time(time)
case time
when Time then time
when String then Time.parse(time).utc
else
if time > 1e12
Time.at(ms2s(time)).utc
def parse_event_time(time)
case time
when Time then time
when String then Time.parse(time).utc
else
Time.at(time).utc
if time > 1e12
Time.at(ms2s(time)).utc
else
Time.at(time).utc
end
end
end
end

def common_labels(job)
Yabeda.default_tags.reverse_merge(
activejob: job.class.to_s,
queue: job.queue_name,
executions: job.executions.to_s,
)
end
def common_labels(job)
Yabeda.default_tags.reverse_merge(
activejob: job.class.to_s,
queue: job.queue_name,
executions: job.executions.to_s,
)
end

def call_after_event_block
Yabeda::ActiveJob.after_event_block.call(event) if Yabeda::ActiveJob.after_event_block.respond_to?(:call)
def call_after_event_block
Yabeda::ActiveJob.after_event_block.call(event) if Yabeda::ActiveJob.after_event_block.respond_to?(:call)
end
end
end
end
1 change: 0 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
require_relative "support/rails_app"
require "rspec/rails"
require "simplecov"
require "active_support"
SimpleCov.start

RSpec.configure do |config|
Expand Down
24 changes: 15 additions & 9 deletions spec/yabeda/activejob_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
describe "job latency calculation" do
# Rails 7.1.4 and above
it "measures correct latency from end_time in seconds", queue_adapter: :test do
start_time = Time.now
base_time = Time.now

# Mock the job's enqueued_at time to be exactly base_time
allow_any_instance_of(HelloJob).to receive(:enqueued_at).and_return(base_time) # rubocop:disable RSpec/AnyInstance

# Mock the event end time to simulate 60 seconds later
allow_any_instance_of(ActiveSupport::Notifications::Event).to receive(:end).and_return(1.minute.from_now(start_time).to_f) # rubocop:disable RSpec/AnyInstance
allow_any_instance_of(ActiveSupport::Notifications::Event).to receive(:end).and_return((base_time + 60.seconds).to_f) # rubocop:disable RSpec/AnyInstance

expect { HelloJob.perform_later }.to have_enqueued_job.on_queue("default")
expect { perform_enqueued_jobs }.to measure_yabeda_histogram(Yabeda.activejob.latency)
Expand All @@ -71,10 +74,13 @@

# Rails 7.1.3 and below
it "measures correct latency from end_time in milliseconds", queue_adapter: :test do
start_time = Time.now
base_time = Time.now

# Mock the job's enqueued_at time to be exactly base_time
allow_any_instance_of(HelloJob).to receive(:enqueued_at).and_return(base_time) # rubocop:disable RSpec/AnyInstance

# Mock the event end time to simulate 60 seconds later (in milliseconds)
allow_any_instance_of(ActiveSupport::Notifications::Event).to receive(:end).and_return(1.minute.from_now(start_time).to_f * 1000) # rubocop:disable RSpec/AnyInstance
allow_any_instance_of(ActiveSupport::Notifications::Event).to receive(:end).and_return((base_time + 60.seconds).to_f * 1000) # rubocop:disable RSpec/AnyInstance

expect { HelloJob.perform_later }.to have_enqueued_job.on_queue("default")
expect { perform_enqueued_jobs }.to measure_yabeda_histogram(Yabeda.activejob.latency)
Expand Down Expand Up @@ -197,7 +203,7 @@
end
end

context "when jobs are bulk enqueued", skip: !ActiveJob.respond_to?(:perform_all_later), queue_adapter: :test do
context "when jobs are bulk enqueued", queue_adapter: :test, skip: !ActiveJob.respond_to?(:perform_all_later) do
it "increments enqueued job counter for all jobs" do
expect do
ActiveJob.perform_all_later([HelloJob.new, HelloJob.new, LongJob.new])
Expand All @@ -213,10 +219,10 @@
it "increments scheduled job counter for scheduled jobs in bulk" do
expect do
ActiveJob.perform_all_later([
HelloJob.new.set(wait: 1.hour),
HelloJob.new,
LongJob.new.set(wait: 2.hours),
])
HelloJob.new.set(wait: 1.hour),
HelloJob.new,
LongJob.new.set(wait: 2.hours),
])
end.to increment_yabeda_counter(Yabeda.activejob.scheduled_total)
.with_tags(queue: "default", activejob: "HelloJob", executions: "0")
.by(1).and(
Expand Down