From 2af29eb6aed82082a5cf231ff251b0e45043d936 Mon Sep 17 00:00:00 2001 From: Bruno Da Silva Date: Wed, 2 May 2018 15:40:50 -0700 Subject: [PATCH 1/2] mm producer: only sleep if there were 0 records in the database last time --- lib/multiple_man/producers/general.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/multiple_man/producers/general.rb b/lib/multiple_man/producers/general.rb index a883853..2b48241 100644 --- a/lib/multiple_man/producers/general.rb +++ b/lib/multiple_man/producers/general.rb @@ -13,7 +13,7 @@ def run_producer last_run = Time.now loop do - timeout(last_run) + timeout(last_run) unless @did_work Connection.connect { |connection| produce_all(connection) } reset! if should_reset? last_run = Time.now @@ -40,12 +40,14 @@ def run_producer # requires 1 confirm per message. def produce_all(connection) ActiveSupport::Notifications.instrument('multiple_man.producer.produce_all') do + @did_work = false Outbox::Message::Sequel.in_groups_and_delete(batch_size) do |messages| break if should_reset? grouped_messages = group_by_set(messages) while grouped_messages.any? + @did_work = true sent_messages = send_messages!(grouped_messages, connection) confirm_published!(sent_messages, connection) if sent_messages remove_empty_lists!(grouped_messages) From 3f455e5a0735975ef4cc957145c30b4a44071308 Mon Sep 17 00:00:00 2001 From: Bruno Da Silva Date: Wed, 2 May 2018 15:44:06 -0700 Subject: [PATCH 2/2] Add pub/sub notifications to the outbox implementation so, when a new record is added to the messages table, the producer is notified --- lib/multiple_man/outbox/message/rails.rb | 5 +++++ lib/multiple_man/outbox/message/sequel.rb | 8 ++++++++ lib/multiple_man/producers/general.rb | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/lib/multiple_man/outbox/message/rails.rb b/lib/multiple_man/outbox/message/rails.rb index e89a0f4..c6a1775 100644 --- a/lib/multiple_man/outbox/message/rails.rb +++ b/lib/multiple_man/outbox/message/rails.rb @@ -14,6 +14,11 @@ def self.push_record(record, operation, options) set_name: MultipleMan::RoutingKey.model_name(routing_key) ).save! end + + after_save do + # Notify the producer that there is a new message + self.class.connection.execute("NOTIFY outbox_channel") + end end end end diff --git a/lib/multiple_man/outbox/message/sequel.rb b/lib/multiple_man/outbox/message/sequel.rb index 80b1a7f..ca3c089 100644 --- a/lib/multiple_man/outbox/message/sequel.rb +++ b/lib/multiple_man/outbox/message/sequel.rb @@ -14,6 +14,14 @@ def self.in_groups_and_delete(size = 100, &block) end end + def self.run_listener(producer_thread) + # This will run infinitely in the main thread + Outbox::DB.connection.listen('outbox_channel', { loop: true }) do |_channel, _notifier_pid, _payload| + # Wake the producer thread up if it's sleeping and a new message comes in + producer_thread.run if producer_thread.status == 'sleep' + end + end + private def self.fetch_messages_from_database(size) diff --git a/lib/multiple_man/producers/general.rb b/lib/multiple_man/producers/general.rb index 2b48241..9d6c1ee 100644 --- a/lib/multiple_man/producers/general.rb +++ b/lib/multiple_man/producers/general.rb @@ -11,6 +11,12 @@ def run_producer require_relative '../outbox/db' require_relative '../outbox/message/sequel' + @producer_thread = Thread.new { producer_loop } + + Outbox::Message::Sequel.run_listener(@producer_thread) + end + + def producer_loop last_run = Time.now loop do timeout(last_run) unless @did_work