Skip to content

Commit

Permalink
Add bulk enqueue support to ActiveJob adapter
Browse files Browse the repository at this point in the history
When using ActiveJob, you can now enqueue jobs in bulk using either
`Que.bulk_enqueue`:
```ruby
Que.bulk_enqueue do
    TestJobClass.perform_later(1, 2)
    OtherTestJobClass.set(wait: 1.minute, queue: 'custom', priority: 200).perform_later(3, 4)
end
```

Or when running Rails >= 7.1 using `ActiveJob.perform_all_later`:
```ruby
ActiveJob.perform_all_later([
    TestJobClass.new(1, 2),
    OtherTestJobClass.new(3, 4).set(wait: 1.minute, queue: 'custom', priority: 200),
])
```
  • Loading branch information
dtcristo committed Mar 6, 2024
1 parent 4835936 commit 691dd68
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 26 deletions.
1 change: 0 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,6 @@ The jobs are only actually enqueued at the end of the block, at which point they

Limitations:

- ActiveJob is not supported
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll

Expand Down
25 changes: 19 additions & 6 deletions lib/que/active_job/extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,32 @@ module QueueAdapters

class QueAdapter
def enqueue(job)
job_options = { priority: job.priority, queue: job.queue_name }
que_job = JobWrapper.enqueue job.serialize, **job_options
job.provider_job_id = que_job.attrs["job_id"]
que_job
enqueue_at(job, nil)
end

def enqueue_at(job, timestamp)
job_options = { priority: job.priority, queue: job.queue_name, run_at: Time.at(timestamp) }
job_options = {
priority: job.priority,
queue: job.queue_name,
run_at: timestamp && Time.at(timestamp)
}
que_job = JobWrapper.enqueue job.serialize, **job_options
job.provider_job_id = que_job.attrs["job_id"]
job.provider_job_id = que_job.que_attrs[:id]
que_job
end

def enqueue_all(jobs)
que_jobs = Que.bulk_enqueue do
jobs.each do |job|
enqueue_at(job, job.scheduled_at)
end
end
que_jobs.zip(jobs).each do |que_job, job|
job.provider_job_id = que_job.que_attrs[:id]
end
que_jobs
end

private

class JobWrapper < Que::Job
Expand Down
4 changes: 0 additions & 4 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ def enqueue(*args)
klass: self,
}

if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
return new({})
end
Expand Down
101 changes: 86 additions & 15 deletions spec/que/active_job/extensions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
require 'spec_helper'

if defined?(::ActiveJob)
describe "running jobs via ActiveJob" do
describe "enqueueing/running jobs via ActiveJob" do
before do
class TestJobClass < ActiveJob::Base
def perform(*args, **kwargs)
$args = args
$kwargs = kwargs
end
end
class OtherTestJobClass < ActiveJob::Base; end
end

after do
Object.send :remove_const, :TestJobClass
$args = nil
$kwargs = nil
Object.send :remove_const, :OtherTestJobClass
end

def execute(&perform_later_block)
Expand Down Expand Up @@ -173,27 +175,96 @@ def perform(*args)
end
end

describe 'with bulk_enqueue' do
describe 'ActiveJobClass.perform_later' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
) do
Que.bulk_enqueue { TestJobClass.perform_later(1, 2) }
def assert_enqueue_active_job(
expected_queues:,
expected_priorities:,
expected_run_ats:,
expected_active_job_classes:,
expected_active_job_arguments:,
expected_count:,
assert_results:,
&enqueue_block
)

assert_equal 0, jobs_dataset.count

results = enqueue_block.call

assert_equal expected_count, jobs_dataset.count

if assert_results
results.each_with_index do |result, i|
assert_kind_of Que::Job, result
assert_instance_of ActiveJob::QueueAdapters::QueAdapter::JobWrapper, result

assert_equal expected_priorities[i], result.que_attrs[:priority]
assert_equal expected_active_job_classes[i], result.que_attrs[:args].first[:job_class]
assert_equal expected_active_job_arguments[i], result.que_attrs[:args].first[:arguments]
assert_equal ({}), result.que_attrs[:kwargs]
assert_equal ({}), result.que_attrs[:data]
end
end

jobs_dataset.order(:id).each_with_index do |job, i|
assert_equal expected_queues[i], job[:queue]
assert_equal expected_priorities[i], job[:priority]
assert_equal expected_active_job_classes[i], job[:args].first[:job_class]
assert_equal expected_active_job_arguments[i], job[:args].first[:arguments]
assert_in_delta job[:run_at], expected_run_ats[i], QueSpec::TIME_SKEW
assert_equal 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper', job[:job_class]
assert_equal ({}), job[:kwargs]
assert_equal ({}), job[:data]
end

jobs_dataset.delete
end

describe "with Que.bulk_enqueue" do
it "should be able to queue multiple jobs with different classes and job options" do
assert_enqueue_active_job(
expected_count: 2,
expected_queues: ['default', 'custom'],
expected_priorities: [100, 200],
expected_run_ats: [Time.now, Time.now + 60],
expected_active_job_classes: ['TestJobClass', 'OtherTestJobClass'],
expected_active_job_arguments: [[1, 2], [3, 4]],
assert_results: true,
) do
Que.bulk_enqueue do
TestJobClass.perform_later(1, 2)
OtherTestJobClass.set(wait: 1.minute, queue: 'custom', priority: 200).perform_later(3, 4)
end
end
end
end

describe 'active_job#enqueue' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
if ActiveJob.gem_version >= Gem::Version.new('7.1')
describe "with ActiveJob.perform_all_later" do
it "should be able to queue multiple jobs with different classes and job options" do
assert_enqueue_active_job(
expected_count: 2,
expected_queues: ['default', 'custom'],
expected_priorities: [100, 200],
expected_run_ats: [Time.now, Time.now + 60],
expected_active_job_classes: ['TestJobClass', 'OtherTestJobClass'],
expected_active_job_arguments: [[1, 2], [3, 4]],
assert_results: false,
) do
Que.bulk_enqueue { TestJobClass.new.enqueue }
ActiveJob.perform_all_later([
TestJobClass.new(1, 2),
OtherTestJobClass.new(3, 4).set(wait: 1.minute, queue: 'custom', priority: 200),
])
end
end

it "should set provider_job_id on job instances" do
jobs = [TestJobClass.new, TestJobClass.new]
ActiveJob.perform_all_later(jobs)
que_jobs = jobs_dataset.order(:id).to_a

assert_equal jobs[0].provider_job_id, que_jobs[0][:id]
assert_equal jobs[1].provider_job_id, que_jobs[1][:id]
end
end
end
end
Expand Down

0 comments on commit 691dd68

Please sign in to comment.