-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Migrate to Solid Queue 0.8.x and extract tables
With Solid Queue 0.8.0, support for dedicated databases were introduced. In fact, Rails recommends moving the database, which we perform with the migration. Unfortunately, it is currently unclear how exactly schema migrations will be provided. Therefore, we cannot use the same GitHub actions workflow for now and will re-add it once a new migration has been published.
- Loading branch information
Showing
14 changed files
with
382 additions
and
158 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
db/migrate/20240907100647_move_solid_queue_to_dedicated_database.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
# frozen_string_literal: true | ||
|
||
require_relative '../scripts/copy_data' | ||
|
||
class MoveSolidQueueToDedicatedDatabase < ActiveRecord::Migration[7.1] | ||
include CopyData | ||
include ActiveRecord::Tasks | ||
|
||
class Queue < ApplicationRecord | ||
self.abstract_class = true | ||
connects_to database: {writing: :queue} | ||
end | ||
|
||
def up | ||
create_database(:queue) | ||
load_queue_schema(database: :queue) | ||
copy_data(source_connection: connection, target_connection: Queue.connection, operation: :matches, condition: 'solid_queue_%') | ||
drop_queue_tables(connection:) | ||
ensure | ||
Queue.connection&.disconnect! | ||
end | ||
|
||
def down | ||
load_queue_schema(database: :primary) | ||
copy_data(source_connection: Queue.connection, target_connection: connection, operation: :matches, condition: 'solid_queue_%') | ||
drop_queue_tables(connection: Queue.connection) | ||
rescue StandardError | ||
Queue.connection&.disconnect! | ||
else | ||
Queue.connection&.disconnect! | ||
drop_database(:queue) | ||
end | ||
|
||
def foreign_key_targets | ||
%w[ | ||
solid_queue_jobs | ||
] | ||
end | ||
|
||
private | ||
|
||
def load_queue_schema(database:) | ||
with_connection(database:) do | ||
DatabaseTasks.load_schema(configs_for(:queue), ActiveRecord.schema_format, 'db/queue_schema.rb') | ||
end | ||
end | ||
|
||
def drop_queue_tables(connection:) | ||
connection.drop_table :solid_queue_semaphores | ||
connection.drop_table :solid_queue_scheduled_executions | ||
connection.drop_table :solid_queue_recurring_tasks | ||
connection.drop_table :solid_queue_recurring_executions | ||
connection.drop_table :solid_queue_ready_executions | ||
connection.drop_table :solid_queue_processes | ||
connection.drop_table :solid_queue_pauses | ||
connection.drop_table :solid_queue_failed_executions | ||
connection.drop_table :solid_queue_claimed_executions | ||
connection.drop_table :solid_queue_blocked_executions | ||
connection.drop_table :solid_queue_jobs | ||
end | ||
|
||
def configs_for(name) | ||
ActiveRecord::Base.configurations.configs_for(name: name.to_s, env_name: Rails.env) | ||
end | ||
|
||
def create_database(name) | ||
database_name = configs_for(name).database | ||
new_primary_connection.create_database(database_name) | ||
rescue ActiveRecord::DatabaseAlreadyExists | ||
# Database already exists, do nothing | ||
end | ||
|
||
def drop_database(name) | ||
database_name = configs_for(name).database | ||
new_primary_connection.drop_database(database_name) | ||
end | ||
|
||
def new_primary_connection | ||
ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.new(configs_for(:primary).configuration_hash) | ||
end | ||
|
||
def with_connection(database:) | ||
# We must not overwrite the `connection`, which is automatically overwritten by establishing a new connection. | ||
# However, we need to specify another connection, i.e. for loading the schema to the desired database. | ||
# Hence, we use this monkey patching workaround to change the connection temporary and then revert back. | ||
klass = database == :queue ? Queue : ApplicationRecord | ||
DatabaseTasks.alias_method :previous_migration_class, :migration_class | ||
DatabaseTasks.define_method(:migration_class) { klass } | ||
yield | ||
ensure | ||
DatabaseTasks.alias_method :migration_class, :previous_migration_class | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
# This file is auto-generated from the current state of the database. Instead | ||
# of editing this file, please use the migrations feature of Active Record to | ||
# incrementally modify your database, and then regenerate this schema definition. | ||
# | ||
# This file is the source Rails uses to define your schema when running `bin/rails | ||
# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to | ||
# be faster and is potentially less error prone than running all of your | ||
# migrations from scratch. Old migrations may fail to apply correctly if those | ||
# migrations use external dependencies or application code. | ||
# | ||
# It's strongly recommended that you check this file into your version control system. | ||
|
||
ActiveRecord::Schema[7.1].define(version: 2024_09_04_193154) do | ||
# These are extensions that must be enabled in order to support this database | ||
enable_extension "plpgsql" | ||
|
||
create_table "solid_queue_blocked_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.string "queue_name", null: false | ||
t.integer "priority", default: 0, null: false | ||
t.string "concurrency_key", null: false | ||
t.datetime "expires_at", null: false | ||
t.datetime "created_at", null: false | ||
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" | ||
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" | ||
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true | ||
end | ||
|
||
create_table "solid_queue_claimed_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.bigint "process_id" | ||
t.datetime "created_at", null: false | ||
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true | ||
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" | ||
end | ||
|
||
create_table "solid_queue_failed_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.text "error" | ||
t.datetime "created_at", null: false | ||
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true | ||
end | ||
|
||
create_table "solid_queue_jobs", force: :cascade do |t| | ||
t.string "queue_name", null: false | ||
t.string "class_name", null: false | ||
t.text "arguments" | ||
t.integer "priority", default: 0, null: false | ||
t.string "active_job_id" | ||
t.datetime "scheduled_at" | ||
t.datetime "finished_at" | ||
t.string "concurrency_key" | ||
t.datetime "created_at", null: false | ||
t.datetime "updated_at", null: false | ||
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" | ||
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" | ||
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" | ||
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" | ||
t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" | ||
end | ||
|
||
create_table "solid_queue_pauses", force: :cascade do |t| | ||
t.string "queue_name", null: false | ||
t.datetime "created_at", null: false | ||
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true | ||
end | ||
|
||
create_table "solid_queue_processes", force: :cascade do |t| | ||
t.string "kind", null: false | ||
t.datetime "last_heartbeat_at", null: false | ||
t.bigint "supervisor_id" | ||
t.integer "pid", null: false | ||
t.string "hostname" | ||
t.text "metadata" | ||
t.datetime "created_at", null: false | ||
t.string "name", null: false | ||
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" | ||
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true | ||
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" | ||
end | ||
|
||
create_table "solid_queue_ready_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.string "queue_name", null: false | ||
t.integer "priority", default: 0, null: false | ||
t.datetime "created_at", null: false | ||
t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true | ||
t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" | ||
t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" | ||
end | ||
|
||
create_table "solid_queue_recurring_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.string "task_key", null: false | ||
t.datetime "run_at", null: false | ||
t.datetime "created_at", null: false | ||
t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true | ||
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true | ||
end | ||
|
||
create_table "solid_queue_recurring_tasks", force: :cascade do |t| | ||
t.string "key", null: false | ||
t.string "schedule", null: false | ||
t.string "command", limit: 2048 | ||
t.string "class_name" | ||
t.text "arguments" | ||
t.string "queue_name" | ||
t.integer "priority", default: 0 | ||
t.boolean "static", default: true, null: false | ||
t.text "description" | ||
t.datetime "created_at", null: false | ||
t.datetime "updated_at", null: false | ||
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true | ||
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" | ||
end | ||
|
||
create_table "solid_queue_scheduled_executions", force: :cascade do |t| | ||
t.bigint "job_id", null: false | ||
t.string "queue_name", null: false | ||
t.integer "priority", default: 0, null: false | ||
t.datetime "scheduled_at", null: false | ||
t.datetime "created_at", null: false | ||
t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true | ||
t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" | ||
end | ||
|
||
create_table "solid_queue_semaphores", force: :cascade do |t| | ||
t.string "key", null: false | ||
t.integer "value", default: 1, null: false | ||
t.datetime "expires_at", null: false | ||
t.datetime "created_at", null: false | ||
t.datetime "updated_at", null: false | ||
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" | ||
t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" | ||
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true | ||
end | ||
|
||
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade | ||
end |
Oops, something went wrong.