Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox latency improvements #60

Open
wants to merge 2 commits into
base: outbox
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/multiple_man/outbox/message/rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def self.push_record(record, operation, options)
set_name: MultipleMan::RoutingKey.model_name(routing_key)
).save!
end

after_save do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case anyone else has the same thought, I just looked up this:

Firstly, if a NOTIFY is executed inside a transaction, the notify events are not delivered until and unless the transaction is committed. This is appropriate, since if the transaction is aborted, all the commands within it have had no effect, including NOTIFY.

# Notify the producer that there is a new message
self.class.connection.execute("NOTIFY outbox_channel")
end
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/multiple_man/outbox/message/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ def self.in_groups_and_delete(size = 100, &block)
end
end

def self.run_listener(producer_thread)
# This will run infinitely in the main thread
Outbox::DB.connection.listen('outbox_channel', { loop: true }) do |_channel, _notifier_pid, _payload|
# Wake the producer thread up if it's sleeping and a new message comes in
producer_thread.run if producer_thread.status == 'sleep'
end
end

private

def self.fetch_messages_from_database(size)
Expand Down
10 changes: 9 additions & 1 deletion lib/multiple_man/producers/general.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ def run_producer
require_relative '../outbox/db'
require_relative '../outbox/message/sequel'

@producer_thread = Thread.new { producer_loop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a threading expert, but what happens if this producer_loop throws an exception? I'm just thinking about db connections, notifications etc


Outbox::Message::Sequel.run_listener(@producer_thread)
end

def producer_loop
last_run = Time.now
loop do
timeout(last_run)
timeout(last_run) unless @did_work
Connection.connect { |connection| produce_all(connection) }
reset! if should_reset?
last_run = Time.now
Expand All @@ -40,12 +46,14 @@ def run_producer
# requires 1 confirm per message.
def produce_all(connection)
ActiveSupport::Notifications.instrument('multiple_man.producer.produce_all') do
@did_work = false
Outbox::Message::Sequel.in_groups_and_delete(batch_size) do |messages|
break if should_reset?

grouped_messages = group_by_set(messages)

while grouped_messages.any?
@did_work = true
sent_messages = send_messages!(grouped_messages, connection)
confirm_published!(sent_messages, connection) if sent_messages
remove_empty_lists!(grouped_messages)
Expand Down