Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve replica pool without checking out a connection #7

Merged
merged 4 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [Unreleased]

- Fix replica connection pool getter when database configurations have multiple replicas
- Retrieve replica pool without checking out a connection

## [0.1.2] - 2024-12-16

Expand Down
36 changes: 20 additions & 16 deletions lib/active_record_proxy_adapters/primary_replica_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ def initialize(primary_connection)

attr_reader :primary_connection, :last_write_at, :active_record_context

delegate :connected_to_stack, to: :connection_class
delegate :connection_handler, :connected_to_stack, to: :connection_class
delegate :reading_role, :writing_role, to: :active_record_context

def replica_pool_unavailable?
!replica_pool
end

def replica_pool
connection_class.connected_to(role: reading_role) { connection_class.connection_pool }
connection_handler.retrieve_connection_pool(connection_class.name, role: reading_role)
end

def connection_class
Expand Down Expand Up @@ -107,21 +111,12 @@ def top_of_connection_stack_role
[reading_role, writing_role].include?(role) ? role : nil
end

def connection_for(role, sql_string) # rubocop:disable Metrics/MethodLength
connection = if role == writing_role
primary_connection
else
begin
replica_pool.checkout(checkout_timeout)
# rescue NoDatabaseError to avoid crashing when running db:create rake task
# rescue ConnectionNotEstablished to handle connectivity issues in the replica
# (for example, replication delay)
rescue ActiveRecord::NoDatabaseError, ActiveRecord::ConnectionNotEstablished
primary_connection
end
end
def connection_for(role, sql_string)
connection = primary_connection if role == writing_role || replica_pool_unavailable?
connection ||= checkout_replica_connection

result = yield(connection)

update_primary_latest_write_timestamp if !replica_connection?(connection) && write_statement?(sql_string)

result
Expand All @@ -130,7 +125,16 @@ def connection_for(role, sql_string) # rubocop:disable Metrics/MethodLength
end

def replica_connection?(connection)
connection != primary_connection
connection && connection != primary_connection
end

def checkout_replica_connection
replica_pool.checkout(checkout_timeout)
# rescue NoDatabaseError to avoid crashing when running db:create rake task
# rescue ConnectionNotEstablished to handle connectivity issues in the replica
# (for example, replication delay)
rescue ActiveRecord::NoDatabaseError, ActiveRecord::ConnectionNotEstablished
primary_connection
end

# @return [TrueClass] if there has been a write within the last {#proxy_delay} seconds
Expand Down
10 changes: 8 additions & 2 deletions spec/active_record/tasks/postgresql_proxy_database_tasks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ def database_exists?

def schema_loaded?
proc do
any_tables = TestHelper::PostgreSQLDatabaseTaskRecord.connection.tables.any?
TestHelper::PostgreSQLDatabaseTaskRecord.connection_pool.disconnect!
pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(
TestHelper::PostgreSQLDatabaseTaskRecord.name, role: TestHelper.writing_role
)
any_tables = TestHelper::PostgreSQLDatabaseTaskRecord.connected_to(role: TestHelper.writing_role) do
TestHelper::PostgreSQLDatabaseTaskRecord.connection.tables.any?
end

pool.disconnect!

any_tables
end
Expand Down
9 changes: 6 additions & 3 deletions spec/active_record_proxy_adapters/postgresql_proxy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ def create_dummy_user
SQL
end

# rubocop:disable RSpec/MultipleMemoizedHelpers
shared_examples_for "a_proxied_method" do |method_name|
subject(:run_test) { proxy.public_send(method_name, sql) }

let(:proxy) { described_class.new(primary_adapter) }
let(:read_only_error_class) { ActiveRecord::ReadOnlyError }
let(:model_class) { TestHelper::PostgreSQLRecord }

context "when query is a select statement" do
let(:sql) { "SELECT * from users" }
Expand Down Expand Up @@ -74,15 +76,15 @@ def create_dummy_user
it "reroutes query to the primary" do
allow(primary_adapter).to receive(:"#{method_name}_unproxied").and_call_original

ActiveRecord::Base.connected_to(role: TestHelper.writing_role) { run_test }
model_class.connected_to(role: TestHelper.writing_role) { run_test }

expect(primary_adapter).to have_received(:"#{method_name}_unproxied").with(sql, any_args).once
end

it "does not checkout a connection from the replica pool" do
allow(replica_pool).to receive(:checkout).and_call_original

ActiveRecord::Base.connected_to(role: TestHelper.writing_role) { run_test }
model_class.connected_to(role: TestHelper.writing_role) { run_test }

expect(replica_pool).not_to have_received(:checkout)
end
Expand All @@ -109,7 +111,7 @@ def create_dummy_user
context "when sticking to replica" do
it "raises database error" do
expect do
ActiveRecord::Base.connected_to(role: TestHelper.reading_role) { run_test }
model_class.connected_to(role: TestHelper.reading_role) { run_test }
end.to raise_error(read_only_error_class)
end
end
Expand Down Expand Up @@ -153,6 +155,7 @@ def create_dummy_user
end
end
end
# rubocop:enable RSpec/MultipleMemoizedHelpers

describe "#execute" do
it_behaves_like "a_proxied_method", :execute do
Expand Down
Loading