diff --git a/lib/resque/plugins/priority_queue.rb b/lib/resque/plugins/priority_queue.rb index 204405d..10a6bb0 100644 --- a/lib/resque/plugins/priority_queue.rb +++ b/lib/resque/plugins/priority_queue.rb @@ -2,8 +2,9 @@ module Resque module Plugins module PriorityQueue - # the score is stored as the priority * multiplier + time.now.to_i, so that "ties" are handled correctly - PRIORITY_MULTIPLIER = (1e13).to_i + # the score is stored as the priority + the fraction of a century since Jan 1, 2012, so that "ties" are handled correctly + One_century = 100*365*24*60*60 + Jan_1_2012 = Time.utc(2012).to_f MIN_PRIORITY = 0 MAX_PRIORITY = 1000 @@ -129,7 +130,7 @@ def clean_priority(sym) # given a job score (from the zset), returns { :priority => cleaned priority, :created_at => unix timestamp } def job_score_parts(score) - { :priority => (score.to_i / PRIORITY_MULTIPLIER), :created_at => (score.to_i % PRIORITY_MULTIPLIER) } + { :priority => score.to_i, :created_at => unpack_timestamp(score.to_f) } end protected @@ -161,9 +162,17 @@ def peek_priority(queue, start=0, count=1) end + def pack_timestamp(ts) + (ts-Jan_1_2012)/One_century + end + + def unpack_timestamp(f) + Jan_1_2012 + (f-f.floor)*One_century + end + # given a priority, calculate the final score to be used when adding the job to the queue zset def calculate_job_score(priority) - (clean_priority(priority) * PRIORITY_MULTIPLIER) + Time.now.to_i + clean_priority(priority) + pack_timestamp(Time.now.to_f) end end diff --git a/test/job_test.rb b/test/job_test.rb index 0c24dc2..e7dcfa3 100644 --- a/test/job_test.rb +++ b/test/job_test.rb @@ -25,7 +25,7 @@ def test_create_with_priority Resque::Job.create_with_priority(:priority_jobs, SomePriorityJob, 75) # we actually store 1000 minus the priority - assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER + assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i @worker.work(0) @@ -36,11 +36,11 @@ def test_create_or_update_priority Resque::Job.create_or_update_priority(:priority_jobs, SomePriorityJob, 75) # we actually store 1000 minus the priority - assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER + assert_equal 925, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => 'SomePriorityJob', :args => [])).to_i Resque::Job.create_or_update_priority(:priority_jobs, SomePriorityJob, 975) # we store 1000 minus the priority - assert_equal 25, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => SomePriorityJob, :args => [])).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER + assert_equal 25, Resque.redis.zscore("queue:priority_jobs", Resque.encode(:class => SomePriorityJob, :args => [])).to_i end def test_job_instance_priority diff --git a/test/resque_test.rb b/test/resque_test.rb index 694351f..ef52457 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -23,7 +23,7 @@ def test_push_with_priority Resque.push_with_priority(:priority_jobs, job, 75) # we actually store 1000 minus the priority - assert_equal 925, Resque.redis.zscore('queue:priority_jobs', Resque.encode(job)).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER + assert_equal 925, Resque.redis.zscore('queue:priority_jobs', Resque.encode(job)).to_i end @@ -37,7 +37,7 @@ def test_push Resque.push(:priority_jobs, new_job) # should also add priority to the job - assert_equal 500, Resque.redis.zscore('queue:priority_jobs', Resque.encode(new_job)).to_i / Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER + assert_equal 500, Resque.redis.zscore('queue:priority_jobs', Resque.encode(new_job)).to_i # a regular push to a queue that hasn't been initialized with priority should be a normal set non_priority_job = { :class => SomeNonPriorityJob, :args => [] } @@ -170,12 +170,19 @@ def test_priority_enabled? end def test_calculate_job_score - @fake_now = Time.now - Time.stubs(:now).returns(@fake_now) - - assert_equal (Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER * 500 + @fake_now.to_i), Resque.send(:calculate_job_score, :normal) - assert_equal (@fake_now.to_i), Resque.send(:calculate_job_score, :highest) - assert_equal (Resque::Plugins::PriorityQueue::PRIORITY_MULTIPLIER * 223 + @fake_now.to_i), Resque.send(:calculate_job_score, 777) + # The whole part of the priority should equal the (inverse) specified priority + assert_equal 500, Resque.send(:calculate_job_score, :normal).to_i + assert_equal 0, Resque.send(:calculate_job_score,:highest).to_i + assert_equal 223, Resque.send(:calculate_job_score, 777).to_i + + # The fractional part should encode creation order. + # The job_score should distinguish (and properly order) timestamp differences + # down to a millisecond, provided that the system clock can. + a = Resque.send(:calculate_job_score,:normal) + t = Time.now+0.001 + sleep 0.0001 until Time.now > t + b = Resque.send(:calculate_job_score,:normal) + assert a < b end diff --git a/test/test_helper.rb b/test/test_helper.rb index 6bf6b34..0ac3efe 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,3 +1,4 @@ +require 'rubygems' require 'test/unit' require 'mocha' require 'resque'