Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions lib/resque/plugins/priority_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ module Resque
module Plugins
module PriorityQueue

# the score is stored as the priority * multiplier + time.now.to_i, so that "ties" are handled correctly
PRIORITY_MULTIPLIER = (1e13).to_i
# the score is stored as the priority + the fraction of a century since Jan 1, 2012, so that "ties" are handled correctly
One_century = 100*365*24*60*60
Jan_1_2012 = Time.utc(2012).to_f

MIN_PRIORITY = 0
MAX_PRIORITY = 1000
Expand Down Expand Up @@ -129,7 +130,7 @@ def clean_priority(sym)

# given a job score (from the zset), returns { :priority => cleaned priority, :created_at => unix timestamp }
def job_score_parts(score)
{ :priority => (score.to_i / PRIORITY_MULTIPLIER), :created_at => (score.to_i % PRIORITY_MULTIPLIER) }
{ :priority => score.to_i, :created_at => unpack_timestamp(score.to_f) }
end

protected
Expand Down Expand Up @@ -161,9 +162,17 @@ def peek_priority(queue, start=0, count=1)

end

def pack_timestamp(ts)
(ts-Jan_1_2012)/One_century
end

def unpack_timestamp(f)
Jan_1_2012 + (f-f.floor)*One_century
end

# given a priority, calculate the final score to be used when adding the job to the queue zset
def calculate_job_score(priority)
(clean_priority(priority) * PRIORITY_MULTIPLIER) + Time.now.to_i
clean_priority(priority) + pack_timestamp(Time.now.to_f)
end

end
Expand Down
6 changes: 3 additions & 3 deletions test/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_create_with_priority
Resque::Job.create_with_priority(:priority_jobs, SomePriorityJob, 75)

# we actually store 1000 minus the priority
assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER
assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i

@worker.work(0)

Expand All @@ -36,11 +36,11 @@ def test_create_or_update_priority
Resque::Job.create_or_update_priority(:priority_jobs, SomePriorityJob, 75)

# we actually store 1000 minus the priority
assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER
assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i

Resque::Job.create_or_update_priority(:priority_jobs, SomePriorityJob, 975)
# we store 1000 minus the priority
assert_equal 25, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => SomePriorityJob, :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER
assert_equal 25, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => SomePriorityJob, :args => [])).to_i
end

def test_job_instance_priority
Expand Down
23 changes: 15 additions & 8 deletions test/resque_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_push_with_priority
Resque.push_with_priority(:priority_jobs, job, 75)

# we actually store 1000 minus the priority
assert_equal 925, Resque.redis.zscore('queue:priority_jobs', Resque.encode(job)).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER
assert_equal 925, Resque.redis.zscore('queue:priority_jobs', Resque.encode(job)).to_i

end

Expand All @@ -37,7 +37,7 @@ def test_push
Resque.push(:priority_jobs, new_job)

# should also add priority to the job
assert_equal 500, Resque.redis.zscore('queue:priority_jobs', Resque.encode(new_job)).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER
assert_equal 500, Resque.redis.zscore('queue:priority_jobs', Resque.encode(new_job)).to_i

# a regular push to a queue that hasn't been initialized with priority should be a normal set
non_priority_job = { :class => SomeNonPriorityJob, :args => [] }
Expand Down Expand Up @@ -170,12 +170,19 @@ def test_priority_enabled?
end

def test_calculate_job_score
@fake_now = Time.now
Time.stubs(:now).returns(@fake_now)

assert_equal (Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER * 500 + @fake_now.to_i), Resque.send(:calculate_job_score, :normal)
assert_equal (@fake_now.to_i), Resque.send(:calculate_job_score, :highest)
assert_equal (Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER * 223 + @fake_now.to_i), Resque.send(:calculate_job_score, 777)
# The whole part of the priority should equal the (inverse) specified priority
assert_equal 500, Resque.send(:calculate_job_score, :normal).to_i
assert_equal 0, Resque.send(:calculate_job_score,:highest).to_i
assert_equal 223, Resque.send(:calculate_job_score, 777).to_i

# The fractional part should encode creation order.
# The job_score should distinguish (and properly order) timestamp differences
# down to a millisecond, provided that the system clock can.
a = Resque.send(:calculate_job_score,:normal)
t = Time.now+0.001
sleep 0.0001 until Time.now > t
b = Resque.send(:calculate_job_score,:normal)
assert a < b

end

Expand Down
1 change: 1 addition & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'rubygems'
require 'test/unit'
require 'mocha'
require 'resque'
Expand Down