From 6d4e8e820d30ac01fcdba9f96c13e2cdc5ec8c24 Mon Sep 17 00:00:00 2001 From: allockicmoi Date: Thu, 13 Jun 2024 21:20:03 +0100 Subject: [PATCH 1/3] Update gouda.rb --- lib/gouda.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/gouda.rb b/lib/gouda.rb index 21eb5d6..7fd4183 100644 --- a/lib/gouda.rb +++ b/lib/gouda.rb @@ -100,7 +100,7 @@ def self.create_tables(active_record_schema) active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue - active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule + active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL AND (serialized_params->'executions')::int = 0", unique: true, name: :guard_double_schedule active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx From 125c11b01db4c762fb097b1af3ec86b4617b329a Mon Sep 17 00:00:00 2001 From: allockicmoi Date: Mon, 17 Jun 2024 11:55:22 +0100 Subject: [PATCH 2/3] only set scheduler key on initial workload for active job --- lib/gouda.rb | 2 +- lib/gouda/adapter.rb | 3 ++- test/gouda/scheduler_test.rb | 24 ++++++++++++++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/lib/gouda.rb b/lib/gouda.rb index 7fd4183..21eb5d6 100644 --- a/lib/gouda.rb +++ b/lib/gouda.rb @@ -100,7 +100,7 @@ def self.create_tables(active_record_schema) active_record_schema.add_index :gouda_workloads, [:priority, :id, :scheduled_at], where: "state = 'enqueued'", name: :gouda_checkout_all_index active_record_schema.add_index :gouda_workloads, [:id, :last_execution_heartbeat_at], where: "state = 'executing'", name: :gouda_last_heartbeat_index active_record_schema.add_index :gouda_workloads, [:enqueue_concurrency_key], where: "state = 'enqueued' AND enqueue_concurrency_key IS NOT NULL", unique: true, name: :guard_double_enqueue - active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL AND (serialized_params->'executions')::int = 0", unique: true, name: :guard_double_schedule + active_record_schema.add_index :gouda_workloads, [:scheduler_key], where: "state = 'enqueued' AND scheduler_key IS NOT NULL", unique: true, name: :guard_double_schedule active_record_schema.add_index :gouda_workloads, [:execution_concurrency_key], where: "state = 'executing' AND execution_concurrency_key IS NOT NULL", unique: true, name: :guard_double_exec active_record_schema.add_index :gouda_workloads, [:active_job_id], name: :same_job_display_idx active_record_schema.add_index :gouda_workloads, [:priority], order: {priority: "ASC NULLS LAST"}, name: :ordered_priority_idx diff --git a/lib/gouda/adapter.rb b/lib/gouda/adapter.rb index 1580bef..3cdb7d7 100644 --- a/lib/gouda/adapter.rb +++ b/lib/gouda/adapter.rb @@ -57,10 +57,11 @@ def enqueue_all(active_jobs) # We can't tell Postgres to ignore conflicts on _both_ the scheduler key and the enqueue concurrency key but not on # the ID - it is either "all indexes" or "just one", but never "this index and that index". MERGE https://www.postgresql.org/docs/current/sql-merge.html # is in theory capable of solving this but let's not complicate things all to hastily, the hour is getting late + scheduler_key = active_job.try(:executions) == 0 ? active_job.scheduler_key : nil # only enforce scheduler key on first workload { active_job_id: active_job.job_id, # Multiple jobs can have the same ID due to retries, job-iteration etc. scheduled_at: active_job.scheduled_at || t_now, - scheduler_key: active_job.scheduler_key, # So that the scheduler_key gets retained between retries + scheduler_key: scheduler_key, priority: active_job.priority, execution_concurrency_key: extract_execution_concurrency_key(active_job), enqueue_concurrency_key: extract_enqueue_concurrency_key(active_job), diff --git a/test/gouda/scheduler_test.rb b/test/gouda/scheduler_test.rb index e271ed5..8f2eb5a 100644 --- a/test/gouda/scheduler_test.rb +++ b/test/gouda/scheduler_test.rb @@ -24,8 +24,6 @@ class FailingJob < ActiveJob::Base class MegaError < StandardError end - gouda_control_concurrency_with(enqueue_limit: 1, key: -> { self.class.to_s }) - retry_on StandardError, wait: :polynomially_longer, attempts: 5 retry_on Gouda::InterruptError, wait: 0, attempts: 5 retry_on MegaError, attempts: 3, wait: 0 @@ -55,6 +53,28 @@ def perform assert Gouda::Workload.count > 3 end + test "retries do not have a scheduler_key" do + tab = { + second_minutely: { + cron: "*/1 * * * * *", # every second + class: "GoudaSchedulerTest::FailingJob" + } + } + + assert_nothing_raised do + Gouda::Scheduler.build_scheduler_entries_list!(tab) + Gouda::Scheduler.upsert_workloads_from_entries_list! + end + + assert_equal 1, Gouda::Workload.enqueued.count + assert_equal "second_minutely_*/1 * * * * *_GoudaSchedulerTest::FailingJob", Gouda::Workload.enqueued.first.scheduler_key + sleep(2) + Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test") + + assert Gouda::Workload.retried.reload.count == 1 + assert_nil Gouda::Workload.retried.first.scheduler_key + end + test "re-inserts the next subsequent job after executing the queued one" do tab = { second_minutely: { From fa192c3ba9099ceb1e1577b327d07fa15e2752ec Mon Sep 17 00:00:00 2001 From: allockicmoi Date: Mon, 17 Jun 2024 12:36:54 +0100 Subject: [PATCH 3/3] Update scheduler_test.rb --- test/gouda/scheduler_test.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/gouda/scheduler_test.rb b/test/gouda/scheduler_test.rb index 8f2eb5a..3e87aaf 100644 --- a/test/gouda/scheduler_test.rb +++ b/test/gouda/scheduler_test.rb @@ -71,8 +71,9 @@ def perform sleep(2) Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test") - assert Gouda::Workload.retried.reload.count == 1 + assert_equal 1, Gouda::Workload.retried.reload.count assert_nil Gouda::Workload.retried.first.scheduler_key + assert_equal "enqueued", Gouda::Workload.retried.first.state end test "re-inserts the next subsequent job after executing the queued one" do