Skip to content

Commit

Permalink
Retrieve replica pool without checking out a connection (#7)
Browse files Browse the repository at this point in the history
* Retrieve replica pool without checking out a connection

* Fix tests

* Fix Rubocop offenses

* Update changelog
  • Loading branch information
mateuscruz authored Dec 24, 2024
1 parent f4d8188 commit 6470ef5
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [Unreleased]

- Fix replica connection pool getter when database configurations have multiple replicas
- Retrieve replica pool without checking out a connection

## [0.1.2] - 2024-12-16

Expand Down
36 changes: 20 additions & 16 deletions lib/active_record_proxy_adapters/primary_replica_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ def initialize(primary_connection)

attr_reader :primary_connection, :last_write_at, :active_record_context

delegate :connected_to_stack, to: :connection_class
delegate :connection_handler, :connected_to_stack, to: :connection_class
delegate :reading_role, :writing_role, to: :active_record_context

def replica_pool_unavailable?
!replica_pool
end

def replica_pool
connection_class.connected_to(role: reading_role) { connection_class.connection_pool }
connection_handler.retrieve_connection_pool(connection_class.name, role: reading_role)
end

def connection_class
Expand Down Expand Up @@ -107,21 +111,12 @@ def top_of_connection_stack_role
[reading_role, writing_role].include?(role) ? role : nil
end

def connection_for(role, sql_string) # rubocop:disable Metrics/MethodLength
connection = if role == writing_role
primary_connection
else
begin
replica_pool.checkout(checkout_timeout)
# rescue NoDatabaseError to avoid crashing when running db:create rake task
# rescue ConnectionNotEstablished to handle connectivity issues in the replica
# (for example, replication delay)
rescue ActiveRecord::NoDatabaseError, ActiveRecord::ConnectionNotEstablished
primary_connection
end
end
def connection_for(role, sql_string)
connection = primary_connection if role == writing_role || replica_pool_unavailable?
connection ||= checkout_replica_connection

result = yield(connection)

update_primary_latest_write_timestamp if !replica_connection?(connection) && write_statement?(sql_string)

result
Expand All @@ -130,7 +125,16 @@ def connection_for(role, sql_string) # rubocop:disable Metrics/MethodLength
end

def replica_connection?(connection)
connection != primary_connection
connection && connection != primary_connection
end

def checkout_replica_connection
replica_pool.checkout(checkout_timeout)
# rescue NoDatabaseError to avoid crashing when running db:create rake task
# rescue ConnectionNotEstablished to handle connectivity issues in the replica
# (for example, replication delay)
rescue ActiveRecord::NoDatabaseError, ActiveRecord::ConnectionNotEstablished
primary_connection
end

# @return [TrueClass] if there has been a write within the last {#proxy_delay} seconds
Expand Down
10 changes: 8 additions & 2 deletions spec/active_record/tasks/postgresql_proxy_database_tasks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ def database_exists?

def schema_loaded?
proc do
any_tables = TestHelper::PostgreSQLDatabaseTaskRecord.connection.tables.any?
TestHelper::PostgreSQLDatabaseTaskRecord.connection_pool.disconnect!
pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(
TestHelper::PostgreSQLDatabaseTaskRecord.name, role: TestHelper.writing_role
)
any_tables = TestHelper::PostgreSQLDatabaseTaskRecord.connected_to(role: TestHelper.writing_role) do
TestHelper::PostgreSQLDatabaseTaskRecord.connection.tables.any?
end

pool.disconnect!

any_tables
end
Expand Down
9 changes: 6 additions & 3 deletions spec/active_record_proxy_adapters/postgresql_proxy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ def create_dummy_user
SQL
end

# rubocop:disable RSpec/MultipleMemoizedHelpers
shared_examples_for "a_proxied_method" do |method_name|
subject(:run_test) { proxy.public_send(method_name, sql) }

let(:proxy) { described_class.new(primary_adapter) }
let(:read_only_error_class) { ActiveRecord::ReadOnlyError }
let(:model_class) { TestHelper::PostgreSQLRecord }

context "when query is a select statement" do
let(:sql) { "SELECT * from users" }
Expand Down Expand Up @@ -74,15 +76,15 @@ def create_dummy_user
it "reroutes query to the primary" do
allow(primary_adapter).to receive(:"#{method_name}_unproxied").and_call_original

ActiveRecord::Base.connected_to(role: TestHelper.writing_role) { run_test }
model_class.connected_to(role: TestHelper.writing_role) { run_test }

expect(primary_adapter).to have_received(:"#{method_name}_unproxied").with(sql, any_args).once
end

it "does not checkout a connection from the replica pool" do
allow(replica_pool).to receive(:checkout).and_call_original

ActiveRecord::Base.connected_to(role: TestHelper.writing_role) { run_test }
model_class.connected_to(role: TestHelper.writing_role) { run_test }

expect(replica_pool).not_to have_received(:checkout)
end
Expand All @@ -109,7 +111,7 @@ def create_dummy_user
context "when sticking to replica" do
it "raises database error" do
expect do
ActiveRecord::Base.connected_to(role: TestHelper.reading_role) { run_test }
model_class.connected_to(role: TestHelper.reading_role) { run_test }
end.to raise_error(read_only_error_class)
end
end
Expand Down Expand Up @@ -153,6 +155,7 @@ def create_dummy_user
end
end
end
# rubocop:enable RSpec/MultipleMemoizedHelpers

describe "#execute" do
it_behaves_like "a_proxied_method", :execute do
Expand Down

0 comments on commit 6470ef5

Please sign in to comment.