Skip to content

Commit

Permalink
Do not execute removed cron workloads (#17)
Browse files Browse the repository at this point in the history
We need to check whether a workload was enqueued with a scheduler key,
but no longer is in the cron table. If that is the case (we are trying
to execute a workload which has a scheduler key, but the scheduler does
not know about that key) it means that the workload has been removed
from the cron table and must not run. Moreover: running it can be
dangerous because it was likely removed from the table for a reason.
Should that be the case, mark the job "finished" and return `nil` to get
to the next poll. If the deployed worker still has the workload in its
scheduler table, but a new deploy removed it - this is a race condition,
but we are willing to accept it.

Note that we are already "just not enqueueing" that job when the cron
table gets loaded - but it is not enough.
If our system can be in a state of partial deployment:

```
  [  release 1 does have some_job_hourly crontab entry ]
                 [  release 2 no longer does                           ]
                 ^ --- race conditions possible here --^
```

...and we remove the crontabled workloads during app boot, it does not
give us a guarantee that release 1 won't reenqueue them. For example,
via the "reinsert next scheduled" feature when the job is executing.
This is why this safeguard is needed.

This protects us from a very dangerous failure mode where we would
remove an entry from the cron table, deploy the change, and then still
have the workload run a day later.

Also, mandate Ruby 3.1 as the minimum version and pin the Standardrb syntax to that.
  • Loading branch information
julik authored Sep 4, 2024
1 parent 7dec20d commit 06e829e
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 64 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ jobs:
strategy:
matrix:
ruby:
- '2.7'
- '3.3'
- '3.1'
services:
postgres:
image: postgres
Expand Down
1 change: 1 addition & 0 deletions .standard.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ruby_version: 3.1
62 changes: 34 additions & 28 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,58 +1,64 @@
## [Unreleased]

## [0.1.0] - 2024-06-10

- Initial release
## [0.1.13] - 2024-09-03

## [0.1.1] - 2024-06-10
- Ensure we won't execute workloads which were scheduled but are no longer present in the cron table entries.

- Fix support for older ruby versions until 2.7
## [0.1.12] - 2024-07-03

## [0.1.2] - 2024-06-11
- When doing polling, suppress DEBUG-level messages. This will stop Gouda spamming the logs with SQL in dev/test environments.

- Updated readme and method renaming in Scheduler
## [0.1.11] - 2024-07-03

## [0.1.3] - 2024-06-11
- Fix: make sure the Gouda logger config does not get used during Rails initialization

- Allow the Rails app to boot even if there is no database yet
## [0.1.10] - 2024-07-03

## [0.1.4] - 2024-06-14
- Fix: remove logger overrides that Gouda should install, as this causes problems for Rails apps hosting Gouda

- Rescue NoDatabaseError at scheduler update.
- Include tests in gem, for sake of easier debugging.
- Reduce logging in local test runs.
- Bump local ruby version to 3.3.3
## [0.1.9] - 2024-06-26

## [0.1.5] - 2024-06-18
- Fix: cleanup_preserved_jobs_before in Gouda::Workload.prune now points to Gouda.config

- Update documentation
- Don't pass on scheduler keys to retries
## [0.1.8] - 2024-06-21

- Move some missed instrumentations to Gouda.instrument

## [0.1.7] - 2024-06-21

- Separate all instrumentation to use ActiveSupport::Notification

## [0.1.6] - 2024-06-18

- Fix: don't upsert workloads twice when starting Gouda.
- Add back in Appsignal calls

## [0.1.7] - 2024-06-21
## [0.1.5] - 2024-06-18

- Separate all instrumentation to use ActiveSupport::Notification
- Update documentation
- Don't pass on scheduler keys to retries

## [0.1.8] - 2024-06-21
## [0.1.4] - 2024-06-14

- Move some missed instrumentations to Gouda.instrument
- Rescue NoDatabaseError at scheduler update.
- Include tests in gem, for sake of easier debugging.
- Reduce logging in local test runs.
- Bump local ruby version to 3.3.3

## [0.1.9] - 2024-06-26
## [0.1.3] - 2024-06-11

- Fix: cleanup_preserved_jobs_before in Gouda::Workload.prune now points to Gouda.config
- Allow the Rails app to boot even if there is no database yet

## [0.1.10] - 2024-07-03
## [0.1.2] - 2024-06-11

- Fix: remove logger overrides that Gouda should install, as this causes problems for Rails apps hosting Gouda
- Updated readme and method renaming in Scheduler

## [0.1.11] - 2024-07-03
## [0.1.1] - 2024-06-10

- Fix: make sure the Gouda logger config does not get used during Rails initialization
- Fix support for older ruby versions until 2.7

## [0.1.12] - 2024-07-03
## [0.1.0] - 2024-06-10

- Initial release

- When doing polling, suppress DEBUG-level messages. This will stop Gouda spamming the logs with SQL in dev/test environments.
4 changes: 2 additions & 2 deletions gouda.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ Gem::Specification.new do |spec|
spec.email = ["sebastian@cheddar.me", "me@julik.nl"]
spec.homepage = "https://github.com/cheddar-me/gouda"
spec.license = "MIT"
spec.required_ruby_version = Gem::Requirement.new(">= 2.7.0")
spec.required_ruby_version = Gem::Requirement.new(">= 3.1.0")
spec.require_paths = ["lib"]

spec.metadata["homepage_uri"] =
spec.metadata["homepage_uri"] = spec.homepage
spec.metadata["source_code_uri"] = spec.homepage
spec.metadata["changelog_uri"] = "https://github.com/cheddar-me/gouda/CHANGELOG.md"

Expand Down
14 changes: 6 additions & 8 deletions lib/gouda.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ def self.configure
def self.logger
# By default, return a logger that sends data nowhere. The `Rails.logger` method
# only becomes available later in the Rails lifecycle.
@fallback_gouda_logger ||= begin
ActiveSupport::Logger.new($stdout).tap do |logger|
logger.level = Logger::WARN
end
@fallback_gouda_logger ||= ActiveSupport::Logger.new($stdout).tap do |logger|
logger.level = Logger::WARN
end

# We want the Rails-configured loggers to take precedence over ours, since Gouda
Expand All @@ -81,22 +79,22 @@ def self.logger
Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger
end

def self.suppressing_sql_logs(&blk)
def self.suppressing_sql_logs(&)
# This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG)
# those methods print a lot of SQL into the logs, on every poll. While that is useful if
# you collect SQL queries from the logs, in most cases - especially if this is used
# in a side-thread inside Puma - the output might be quite annoying. So silence the
# logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL.
if Gouda::Workload.logger
Gouda::Workload.logger.silence(Logger::INFO, &blk)
Gouda::Workload.logger.silence(Logger::INFO, &)
else
# In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil
yield
end
end

def self.instrument(channel, options, &block)
ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &block)
def self.instrument(channel, options, &)
ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &)
end

def self.create_tables(active_record_schema)
Expand Down
2 changes: 1 addition & 1 deletion lib/gouda/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def enqueue_all(active_jobs)
# We can't tell Postgres to ignore conflicts on _both_ the scheduler key and the enqueue concurrency key but not on
# the ID - it is either "all indexes" or "just one", but never "this index and that index". MERGE https://www.postgresql.org/docs/current/sql-merge.html
# is in theory capable of solving this but let's not complicate things all to hastily, the hour is getting late
scheduler_key = active_job.try(:executions) == 0 ? active_job.scheduler_key : nil # only enforce scheduler key on first workload
scheduler_key = (active_job.try(:executions) == 0) ? active_job.scheduler_key : nil # only enforce scheduler key on first workload
{
active_job_id: active_job.job_id, # Multiple jobs can have the same ID due to retries, job-iteration etc.
scheduled_at: active_job.scheduled_at || t_now,
Expand Down
17 changes: 15 additions & 2 deletions lib/gouda/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def kwargs_value
# @return Array[Entry]
def self.build_scheduler_entries_list!(cron_table_hash = nil)
Gouda.logger.info "Updating scheduled workload entries..."
if cron_table_hash.blank?
if cron_table_hash.nil? # An empty hash indicates that an empty crontab will be loaded
config_from_rails = Rails.application.config.try(:gouda)

cron_table_hash = if config_from_rails.present?
Expand All @@ -106,6 +106,9 @@ def self.build_scheduler_entries_list!(cron_table_hash = nil)
params_with_defaults = defaults.merge(cron_entry_params)
Entry.new(name: name, **params_with_defaults)
end
@known_scheduler_keys = Set.new(@cron_table.map(&:scheduler_key))

@cron_table
end

# Once a workload has finished (doesn't matter whether it raised an exception
Expand All @@ -132,6 +135,14 @@ def self.entries
@cron_table || []
end

# Returns the set of known scheduler keys that may be present in the workloads table and are defined
# by the current entries.
#
# @return Set[String]
def self.known_scheduler_keys
@known_scheduler_keys || Set.new
end

# Will upsert (`INSERT ... ON CONFLICT UPDATE`) workloads for all entries which are in the scheduler entries
# table (the table needs to be read or hydrated first using `build_scheduler_entries_list!`). This is done
# in a transaction. Any workloads which have been previously inserted from the scheduled entries, but no
Expand All @@ -143,9 +154,11 @@ def self.entries
def self.upsert_workloads_from_entries_list!
table_entries = @cron_table || []

# Remove any cron keyed workloads which no longer match config-wise
# Remove any cron keyed workloads which no longer match config-wise.
# We do this to keep things clean (but it is not enough, an extra guard is needed in Workload checkout)
known_keys = table_entries.map(&:scheduler_key).uniq
Gouda::Workload.transaction do
# We do this to keep things a bit clean
Gouda::Workload.where.not(scheduler_key: known_keys).delete_all

# Insert the next iteration for every "next" entry in the crontab.
Expand Down
2 changes: 1 addition & 1 deletion lib/gouda/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Gouda
VERSION = "0.1.12"
VERSION = "0.1.13"
end
40 changes: 34 additions & 6 deletions lib/gouda/workload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue)
AND NOT EXISTS (
SELECT NULL
FROM #{quoted_table_name} AS concurrent
WHERE concurrent.state = 'executing'
WHERE concurrent.state = 'executing'
AND concurrent.execution_concurrency_key = workloads.execution_concurrency_key
)
AND workloads.scheduled_at <= clock_timestamp()
SQL
# Enter a txn just to mark this job as being executed "by us". This allows us to avoid any
# locks during execution itself, including advisory locks
jobs = Gouda::Workload
workloads = Gouda::Workload
.select("workloads.*")
.from("#{quoted_table_name} AS workloads")
.where(where_query)
Expand All @@ -111,13 +111,41 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue)
.limit(1)

_first_available_workload = ActiveSupport::Notifications.instrument(:checkout_and_lock_one, {queue_constraint: queue_constraint.to_sql}) do |payload|
payload[:condition_sql] = jobs.to_sql
payload[:condition_sql] = workloads.to_sql
payload[:retried_checkouts_due_to_concurrent_exec] = 0
uncached do # Necessary because we SELECT with a clock_timestamp() which otherwise gets cached by ActiveRecord query cache
transaction do
job = Gouda.suppressing_sql_logs { jobs.first } # Silence SQL output as this gets called very frequently
job&.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
job
workload = Gouda.suppressing_sql_logs { workloads.first } # Silence SQL output as this gets called very frequently
return nil unless workload

if workload.scheduler_key && !Gouda::Scheduler.known_scheduler_keys.include?(workload.scheduler_key)
# Check whether this workload was enqueued with a scheduler key, but no longer is in the cron table.
# If that is the case (we are trying to execute a workload which has a scheduler key, but the scheduler
# does not know about that key) it means that the workload has been removed from the cron table and must not run.
# Moreover: running it can be dangerous because it was likely removed from the table for a reason.
# Should that be the case, mark the job "finished" and return `nil` to get to the next poll. If the deployed worker still has
# the workload in its scheduler table, but a new deploy removed it - this is a race condition, but we are willing to accept it.
# Note that we are already "just not enqueueing" that job when the cron table gets loaded - this already happens.
#
# Removing jobs from the queue forcibly when we load the cron table is nice, but not enough, because our system can be in a state
# of partial deployment:
#
# [ release 1 does have some_job_hourly crontab entry ]
# [ release 2 no longer does ]
# ^ --- race conditions possible here --^
#
# So even if we remove the crontabled workloads during app boot, it does not give us a guarantee that release 1 won't reinsert them.
# This is why this safeguard is needed.
error = {class_name: "WorkloadSkippedError", message: "Skipped as scheduler_key was no longer in the cron table"}
workload.update!(state: "finished", error:)
# And return nil. This will cause a brief "sleep" in the polling routine since the caller may think there are no more workloads
# in the queue, but only for a brief moment.
nil
else
# Once we have verified this job is OK to execute
workload.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
workload
end
rescue ActiveRecord::RecordNotUnique
# It can happen that due to a race the `execution_concurrency_key NOT IN` does not capture
# a job which _just_ entered the "executing" state, apparently after we do our SELECT. This will happen regardless
Expand Down
39 changes: 39 additions & 0 deletions test/gouda/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,45 @@ def perform
assert_equal [nil, nil], Gouda::Workload.first.serialized_params["arguments"]
end

test "ensures a job that was scheduled but no longer present in the cron table gets force-finished without executing" do
tab = {
first_hourly: {
cron: "@hourly",
class: "GoudaSchedulerTest::TestJob",
args: [nil, nil]
}
}

assert_nothing_raised do
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

Gouda::Workload.delete_all
assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

# Update all workloads so that it is already time for it to be executed (as we use clock_timestamp()
# time travel is not possible in those tests)
Gouda::Workload.update_all(scheduled_at: Time.now - 2.minutes)

workload = Gouda::Workload.checkout_and_lock_one(executing_on: "test")
assert workload # Now this workload does get selected for execution
workload.update(state: "enqueued") # Return it to the queue

# Erase the crontab.
# No need to enqueue next jobs in this test as there would not be jobs enqueued anyway
assert_nothing_raised do
Gouda::Scheduler.build_scheduler_entries_list!({})
end

assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "test"), "The workload should not be picked for execution now"
just_finished_workload = Gouda::Workload.where(state: "finished").first!
assert_equal "finished", just_finished_workload.state
assert just_finished_workload.error
assert_match(/scheduler/, just_finished_workload.error.fetch("message"))
end

test "is able to accept a crontab" do
tab = {
first_hourly: {
Expand Down
17 changes: 3 additions & 14 deletions test/gouda/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,16 @@ def truncate_test_tables
ActiveRecord::Base.connection.execute("TRUNCATE TABLE gouda_job_fuses")
end

def test_create_tables
ActiveRecord::Base.transaction do
ActiveRecord::Base.connection.execute("DROP TABLE gouda_workloads")
ActiveRecord::Base.connection.execute("DROP TABLE gouda_job_fuses")
# The adapter has to be in a variable as the schema definition is scoped to the migrator, not self
ActiveRecord::Schema.define(version: 1) do |via_definer|
Gouda.create_tables(via_definer)
end
end
end

def subscribed_notification_for(notification)
payload = nil
subscription = ActiveSupport::Notifications.subscribe notification do |name, start, finish, id, _payload|
payload = _payload
subscription = ActiveSupport::Notifications.subscribe notification do |name, start, finish, id, local_payload|
payload = local_payload
end

yield

ActiveSupport::Notifications.unsubscribe(subscription)

return payload
payload
end
end

0 comments on commit 06e829e

Please sign in to comment.