From 722bb8e6aa2f32e7fe4573e189ec02a022a79392 Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Tue, 18 Jun 2024 17:44:41 +0200 Subject: [PATCH] Improve documentation a bit (#10) --- .../queue_adapters/gouda_adapter.rb | 12 +++-- lib/gouda/bulk.rb | 16 +++++++ lib/gouda/queue_constraints.rb | 46 ++++++++++--------- lib/gouda/scheduler.rb | 6 +++ 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/lib/active_job/queue_adapters/gouda_adapter.rb b/lib/active_job/queue_adapters/gouda_adapter.rb index 08ee163..adc6f28 100644 --- a/lib/active_job/queue_adapters/gouda_adapter.rb +++ b/lib/active_job/queue_adapters/gouda_adapter.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true -module ActiveJob # :nodoc: - module QueueAdapters # :nodoc: - class GoudaAdapter < Gouda::Adapter - end - end +# The sole purpose of this class is so that you can do +# `config.active_job.queue_adapter = :gouda` in your Rails +# config, as Rails insists on resolving the adapter module +# name from the symbol automatically. If Rails ever allows +# us to "register" an adapter to a symbol this module can +# be removed later. +class ActiveJob::QueueAdapters::GoudaAdapter < Gouda::Adapter end diff --git a/lib/gouda/bulk.rb b/lib/gouda/bulk.rb index de44280..e5973d2 100644 --- a/lib/gouda/bulk.rb +++ b/lib/gouda/bulk.rb @@ -1,6 +1,22 @@ # frozen_string_literal: true module Gouda + # Inside this call, all `perform_later` calls on ActiveJob subclasses + # (including mailers) will be buffered. The call is reentrant, so you + # can have multiple `in_bulk` calls with arbitrary nesting. At the end + # of the block, the buffered jobs will be enqueued using their respective + # adapters. If an adapter supports `enqueue_all` (Sidekiq does in recent + # releases of Rails, for example), this functionality will be used. This + # method is especially useful when doing things such as mass-emails, or + # maintenance tasks where a large swath of jobs gets enqueued at once. + # + # @example + # Gouda.in_bulk do + # User.recently_joined.find_each do |recently_joined_user| + # GreetingJob.perform_later(recently_joined_user) + # end + # end + # @return [Object] the return value of the block def self.in_bulk(&blk) if Thread.current[:gouda_bulk_buffer].nil? Thread.current[:gouda_bulk_buffer] = [] diff --git a/lib/gouda/queue_constraints.rb b/lib/gouda/queue_constraints.rb index f6ec78c..2debb10 100644 --- a/lib/gouda/queue_constraints.rb +++ b/lib/gouda/queue_constraints.rb @@ -1,44 +1,48 @@ # frozen_string_literal: true module Gouda + # A queue constraint supports just one method, `to_sql`, which returns + # a condition based on the `queue_name` value of the `gouda_workloads` + # table. The minimal constraint is just a no-op - it allows execution + # of jobs from all queues in the system. module AnyQueue def self.to_sql "1=1" end end + # Allows execution of jobs only from specified queues + # For example, if you have a queue named "gpu", and you run + # jobs requiring a GPU on this queue, on your worker script + # running on GPU-equipped machines you could use + # `OnlyQueuesConstraint.new([:gpu])` class OnlyQueuesConstraint < Struct.new(:queue_names) def to_sql placeholders = (["?"] * queue_names.length).join(",") - ActiveRecord::Base.sanitize_sql_array([<<~SQL, *queue_names]) + Gouda::Workload.sanitize_sql_array([<<~SQL, *queue_names]) queue_name IN (#{placeholders}) SQL end end + # Allows execution of jobs from queues except the given ones + # For example, if you have a queue named "emails" which is time-critical, + # on all other machines your worker script can specify + # `ExceptQueueConstraint.new([:emails])` class ExceptQueueConstraint < Struct.new(:queue_names) def to_sql placeholders = (["?"] * queue_names.length).join(",") - ActiveRecord::Base.sanitize_sql_array([<<~SQL, *queue_names]) + Gouda::Workload.sanitize_sql_array([<<~SQL, *queue_names]) queue_name NOT IN (#{placeholders}) SQL end end - def self.parse_queue_constraint(constraint_str_from_envvar) - parsed = queue_parser(constraint_str_from_envvar) - if parsed[:include] - OnlyQueuesConstraint.new(parsed[:include]) - elsif parsed[:exclude] - ExceptQueueConstraint.new(parsed[:exclude]) - else - AnyQueue - end - end - - # Parse a string representing a group of queues into a more readable data - # structure. - # @param string [String] Queue string + # Parse a string representing a group of queues into a queue constraint + # Note that this works similar to good_job. For example, the + # constraints do not necessarily compose all that well. + # + # @param queue_constraint_str[String] Queue string # @return [Hash] # How to match a given queue. It can have the following keys and values: # - +{ all: true }+ indicates that all queues match. @@ -49,8 +53,8 @@ def self.parse_queue_constraint(constraint_str_from_envvar) # @example # Gouda::QueueConstraints.queue_parser('-queue1,queue2') # => { exclude: [ 'queue1', 'queue2' ] } - def self.queue_parser(string) - string = string.presence || "*" + def self.parse_queue_constraint(queue_constraint_str) + string = queue_constraint_str.presence || "*" case string.first when "-" @@ -63,11 +67,11 @@ def self.queue_parser(string) queues = string.split(",").map(&:strip) if queues.include?("*") - {all: true} + AnyQueue elsif exclude_queues - {exclude: queues} + ExceptQueueConstraint.new([queues]) else - {include: queues} + OnlyQueuesConstraint.new([queues]) end end end diff --git a/lib/gouda/scheduler.rb b/lib/gouda/scheduler.rb index 8da23c6..e56b96f 100644 --- a/lib/gouda/scheduler.rb +++ b/lib/gouda/scheduler.rb @@ -11,6 +11,9 @@ def scheduler_key [name, interval_seconds, cron, job_class].compact.join("_") end + # Tells when this particular task should next run + # + # @return [Time] def next_at if interval_seconds first_existing = Gouda::Workload.where(scheduler_key: scheduler_key).where("scheduled_at > NOW()").order("scheduled_at DESC").pluck(:scheduled_at).first @@ -22,6 +25,9 @@ def next_at end end + # Builds the ActiveJob which can be enqueued for this entry + # + # @return [ActiveJob::Base] def build_active_job next_at = self.next_at return unless next_at