diff --git a/lib/multiple_man/outbox/message/rails.rb b/lib/multiple_man/outbox/message/rails.rb index e89a0f4..c6a1775 100644 --- a/lib/multiple_man/outbox/message/rails.rb +++ b/lib/multiple_man/outbox/message/rails.rb @@ -14,6 +14,11 @@ def self.push_record(record, operation, options) set_name: MultipleMan::RoutingKey.model_name(routing_key) ).save! end + + after_save do + # Notify the producer that there is a new message + self.class.connection.execute("NOTIFY outbox_channel") + end end end end diff --git a/lib/multiple_man/outbox/message/sequel.rb b/lib/multiple_man/outbox/message/sequel.rb index 80b1a7f..ca3c089 100644 --- a/lib/multiple_man/outbox/message/sequel.rb +++ b/lib/multiple_man/outbox/message/sequel.rb @@ -14,6 +14,14 @@ def self.in_groups_and_delete(size = 100, &block) end end + def self.run_listener(producer_thread) + # This will run infinitely in the main thread + Outbox::DB.connection.listen('outbox_channel', { loop: true }) do |_channel, _notifier_pid, _payload| + # Wake the producer thread up if it's sleeping and a new message comes in + producer_thread.run if producer_thread.status == 'sleep' + end + end + private def self.fetch_messages_from_database(size) diff --git a/lib/multiple_man/producers/general.rb b/lib/multiple_man/producers/general.rb index a883853..9d6c1ee 100644 --- a/lib/multiple_man/producers/general.rb +++ b/lib/multiple_man/producers/general.rb @@ -11,9 +11,15 @@ def run_producer require_relative '../outbox/db' require_relative '../outbox/message/sequel' + @producer_thread = Thread.new { producer_loop } + + Outbox::Message::Sequel.run_listener(@producer_thread) + end + + def producer_loop last_run = Time.now loop do - timeout(last_run) + timeout(last_run) unless @did_work Connection.connect { |connection| produce_all(connection) } reset! if should_reset? last_run = Time.now @@ -40,12 +46,14 @@ def run_producer # requires 1 confirm per message. def produce_all(connection) ActiveSupport::Notifications.instrument('multiple_man.producer.produce_all') do + @did_work = false Outbox::Message::Sequel.in_groups_and_delete(batch_size) do |messages| break if should_reset? grouped_messages = group_by_set(messages) while grouped_messages.any? + @did_work = true sent_messages = send_messages!(grouped_messages, connection) confirm_published!(sent_messages, connection) if sent_messages remove_empty_lists!(grouped_messages)