Skip to content

Commit

Permalink
Improve documentation a bit (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
julik authored and svanhesteren committed Jun 18, 2024
1 parent b0af119 commit 722bb8e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 26 deletions.
12 changes: 7 additions & 5 deletions lib/active_job/queue_adapters/gouda_adapter.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# frozen_string_literal: true

module ActiveJob # :nodoc:
module QueueAdapters # :nodoc:
class GoudaAdapter < Gouda::Adapter
end
end
# The sole purpose of this class is so that you can do
# `config.active_job.queue_adapter = :gouda` in your Rails
# config, as Rails insists on resolving the adapter module
# name from the symbol automatically. If Rails ever allows
# us to "register" an adapter to a symbol this module can
# be removed later.
class ActiveJob::QueueAdapters::GoudaAdapter < Gouda::Adapter
end
16 changes: 16 additions & 0 deletions lib/gouda/bulk.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
# frozen_string_literal: true

module Gouda
# Inside this call, all `perform_later` calls on ActiveJob subclasses
# (including mailers) will be buffered. The call is reentrant, so you
# can have multiple `in_bulk` calls with arbitrary nesting. At the end
# of the block, the buffered jobs will be enqueued using their respective
# adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent
# releases of Rails, for example), this functionality will be used. This
# method is especially useful when doing things such as mass-emails, or
# maintenance tasks where a large swath of jobs gets enqueued at once.
#
# @example
# Gouda.in_bulk do
# User.recently_joined.find_each do |recently_joined_user|
# GreetingJob.perform_later(recently_joined_user)
# end
# end
# @return [Object] the return value of the block
def self.in_bulk(&blk)
if Thread.current[:gouda_bulk_buffer].nil?
Thread.current[:gouda_bulk_buffer] = []
Expand Down
46 changes: 25 additions & 21 deletions lib/gouda/queue_constraints.rb
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
# frozen_string_literal: true

module Gouda
# A queue constraint supports just one method, `to_sql`, which returns
# a condition based on the `queue_name` value of the `gouda_workloads`
# table. The minimal constraint is just a no-op - it allows execution
# of jobs from all queues in the system.
module AnyQueue
def self.to_sql
"1=1"
end
end

# Allows execution of jobs only from specified queues
# For example, if you have a queue named "gpu", and you run
# jobs requiring a GPU on this queue, on your worker script
# running on GPU-equipped machines you could use
# `OnlyQueuesConstraint.new([:gpu])`
class OnlyQueuesConstraint < Struct.new(:queue_names)
def to_sql
placeholders = (["?"] * queue_names.length).join(",")
ActiveRecord::Base.sanitize_sql_array([<<~SQL, *queue_names])
Gouda::Workload.sanitize_sql_array([<<~SQL, *queue_names])
queue_name IN (#{placeholders})
SQL
end
end

# Allows execution of jobs from queues except the given ones
# For example, if you have a queue named "emails" which is time-critical,
# on all other machines your worker script can specify
# `ExceptQueueConstraint.new([:emails])`
class ExceptQueueConstraint < Struct.new(:queue_names)
def to_sql
placeholders = (["?"] * queue_names.length).join(",")
ActiveRecord::Base.sanitize_sql_array([<<~SQL, *queue_names])
Gouda::Workload.sanitize_sql_array([<<~SQL, *queue_names])
queue_name NOT IN (#{placeholders})
SQL
end
end

def self.parse_queue_constraint(constraint_str_from_envvar)
parsed = queue_parser(constraint_str_from_envvar)
if parsed[:include]
OnlyQueuesConstraint.new(parsed[:include])
elsif parsed[:exclude]
ExceptQueueConstraint.new(parsed[:exclude])
else
AnyQueue
end
end

# Parse a string representing a group of queues into a more readable data
# structure.
# @param string [String] Queue string
# Parse a string representing a group of queues into a queue constraint
# Note that this works similar to good_job. For example, the
# constraints do not necessarily compose all that well.
#
# @param queue_constraint_str[String] Queue string
# @return [Hash]
# How to match a given queue. It can have the following keys and values:
# - +{ all: true }+ indicates that all queues match.
Expand All @@ -49,8 +53,8 @@ def self.parse_queue_constraint(constraint_str_from_envvar)
# @example
# Gouda::QueueConstraints.queue_parser('-queue1,queue2')
# => { exclude: [ 'queue1', 'queue2' ] }
def self.queue_parser(string)
string = string.presence || "*"
def self.parse_queue_constraint(queue_constraint_str)
string = queue_constraint_str.presence || "*"

case string.first
when "-"
Expand All @@ -63,11 +67,11 @@ def self.queue_parser(string)
queues = string.split(",").map(&:strip)

if queues.include?("*")
{all: true}
AnyQueue
elsif exclude_queues
{exclude: queues}
ExceptQueueConstraint.new([queues])
else
{include: queues}
OnlyQueuesConstraint.new([queues])
end
end
end
6 changes: 6 additions & 0 deletions lib/gouda/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def scheduler_key
[name, interval_seconds, cron, job_class].compact.join("_")
end

# Tells when this particular task should next run
#
# @return [Time]
def next_at
if interval_seconds
first_existing = Gouda::Workload.where(scheduler_key: scheduler_key).where("scheduled_at > NOW()").order("scheduled_at DESC").pluck(:scheduled_at).first
Expand All @@ -22,6 +25,9 @@ def next_at
end
end

# Builds the ActiveJob which can be enqueued for this entry
#
# @return [ActiveJob::Base]
def build_active_job
next_at = self.next_at
return unless next_at
Expand Down

0 comments on commit 722bb8e

Please sign in to comment.