-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add temporarily_release_connection Database extension for multithread…
…ed transactional testing This allows one thread to start a transaction, and then release the connection back for usage by the connection pool, so that other threads can operate on the connection object safely inside the transaction. This requires the connection pool be limited to a single connection, to ensure that the released connection can be reacquired. It's not perfect, because if the connection is disconnected and removed from the pool while temporarily released, there is no way to handle that situation correctly.
- Loading branch information
1 parent
1549236
commit f372eeb
Showing
5 changed files
with
322 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
178 changes: 178 additions & 0 deletions
178
lib/sequel/extensions/temporarily_release_connection.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
# frozen-string-literal: true | ||
# | ||
# The temporarily_release_connection extension adds support for temporarily | ||
# releasing a checked out connection back to the connection pool. It is | ||
# designed for use in multithreaded transactional integration tests, allowing | ||
# a connection to start a transaction in one thread, but be temporarily | ||
# released back to the connection pool, so it can be operated on safely | ||
# by multiple threads inside a block. For example, the main thread could be | ||
# running tests that send web requests, and a separate thread running a web | ||
# server that is responding to those requests, and the same connection and | ||
# transaction would be used for both. | ||
# | ||
# To load the extension into the database: | ||
# | ||
# DB.extension :temporarily_release_connection | ||
# | ||
# After the extension is loaded, call the +temporarily_release_connection+ | ||
# method with the connection object to temporarily release the connection | ||
# back to the pool. Example: | ||
# | ||
# DB.transaction(rollback: :always, auto_savepoint: true) do |conn| | ||
# DB.temporarily_release_connection(conn) do | ||
# # Other threads can operate on connection safely inside the transaction | ||
# yield | ||
# end | ||
# end | ||
# | ||
# For sharded connection pools, the second argument to +temporarily_release_connection+ | ||
# is respected, and specifies the server on which to temporarily release the connection. | ||
# | ||
# The temporarily_release_connection extension is only supported with the | ||
# threaded and timed_queue connection pools that ship with Sequel (and the sharded | ||
# versions of each). To make sure that same connection object can be reacquired, it | ||
# is only supported if the maximum connection pool size is 1, so set the Database | ||
# :max_connections option to 1 if you plan to use this extension. | ||
# | ||
# If the +temporarily_release_connection+ method cannot reacquire the same connection | ||
# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError | ||
# exception. This should only happen if the connection has been disconnected | ||
# while it was temporarily released. If this error is raised, Database#transaction | ||
# will not rollback the transaction, since the connection object is likely no longer | ||
# valid, and on poorly written database drivers, that could cause the process to crash. | ||
# | ||
# Related modules: Sequel::TemporarilyReleaseConnection, | ||
# Sequel::UnableToReacquireConnectionError | ||
|
||
# | ||
module Sequel | ||
# Error class raised if the connection pool does not provide the same connection | ||
# object when checking a temporarily released connection out. | ||
class UnableToReacquireConnectionError < Error | ||
end | ||
|
||
module TemporarilyReleaseConnection | ||
module DatabaseMethods | ||
# Temporarily release the connection back to the connection pool for the | ||
# duration of the block. | ||
def temporarily_release_connection(conn, server=:default, &block) | ||
pool.temporarily_release_connection(conn, server, &block) | ||
end | ||
|
||
private | ||
|
||
# Do nothing if UnableToReacquireConnectionError is raised, as it is | ||
# likely the connection is not in a usable state. | ||
def rollback_transaction(conn, opts) | ||
return if UnableToReacquireConnectionError === $! | ||
super | ||
end | ||
end | ||
|
||
module PoolMethods | ||
# Temporarily release a currently checked out connection, then yield to the block. Reacquire the same | ||
# connection upon the exit of the block. | ||
def temporarily_release_connection(conn, server) | ||
t = Sequel.current | ||
raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server)) | ||
|
||
begin | ||
trc_release(t, conn, server) | ||
yield | ||
ensure | ||
c = trc_acquire(t, server) | ||
unless conn.equal?(c) | ||
raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection" | ||
end | ||
end | ||
end | ||
end | ||
|
||
module TimedQueue | ||
private | ||
|
||
def trc_owned_connection(t, server) | ||
owned_connection(t) | ||
end | ||
|
||
def trc_release(t, conn, server) | ||
release(t) | ||
end | ||
|
||
def trc_acquire(t, server) | ||
acquire(t) | ||
end | ||
end | ||
|
||
module ShardedTimedQueue | ||
# Normalize the server name for sharded connection pools | ||
def temporarily_release_connection(conn, server) | ||
server = pick_server(server) | ||
super | ||
end | ||
|
||
private | ||
|
||
def trc_owned_connection(t, server) | ||
owned_connection(t, server) | ||
end | ||
|
||
def trc_release(t, conn, server) | ||
release(t, conn, server) | ||
end | ||
|
||
def trc_acquire(t, server) | ||
acquire(t, server) | ||
end | ||
end | ||
|
||
module ThreadedBase | ||
private | ||
|
||
def trc_release(t, conn, server) | ||
sync{super} | ||
end | ||
end | ||
|
||
module Threaded | ||
include TimedQueue | ||
include ThreadedBase | ||
end | ||
|
||
module ShardedThreaded | ||
include ShardedTimedQueue | ||
include ThreadedBase | ||
end | ||
end | ||
|
||
trc = TemporarilyReleaseConnection | ||
trc_map = { | ||
:threaded => trc::Threaded, | ||
:sharded_threaded => trc::ShardedThreaded, | ||
:timed_queue => trc::TimedQueue, | ||
:sharded_timed_queue => trc::ShardedTimedQueue, | ||
}.freeze | ||
|
||
Database.register_extension(:temporarily_release_connection) do |db| | ||
unless pool_mod = trc_map[db.pool.pool_type] | ||
raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}") | ||
end | ||
|
||
case db.pool.pool_type | ||
when :threaded, :sharded_threaded | ||
if db.opts[:connection_handling] == :disconnect | ||
raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option" | ||
end | ||
end | ||
|
||
unless db.pool.max_size == 1 | ||
raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1" | ||
end | ||
|
||
db.extend(trc::DatabaseMethods) | ||
db.pool.extend(trc::PoolMethods) | ||
db.pool.extend(pool_mod) | ||
end | ||
|
||
private_constant :TemporarilyReleaseConnection | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
require_relative "spec_helper" | ||
|
||
pool_types = [ :threaded, :sharded_threaded] | ||
pool_types += [ :timed_queue, :sharded_timed_queue] if RUBY_VERSION >= '3.2' | ||
|
||
pool_types.each do |pool_type| | ||
describe "temporarily_release_connection extension with pool class #{pool_type}" do | ||
before do | ||
opts = {:max_connections=>1, :pool_class=>pool_type} | ||
if pool_type.to_s.start_with?('sharded') | ||
opts[:servers] = {:foo=>{}, :bar=>{}} | ||
end | ||
@db = Sequel.mock(opts).extension(:temporarily_release_connection) | ||
end | ||
|
||
it "should temporarily release connection during block so it can be acquired by other threads" do | ||
conns = [] | ||
@db.transaction(:rollback=>:always) do |c| | ||
@db.temporarily_release_connection(c) do | ||
4.times.map do |i| | ||
Thread.new do | ||
@db.synchronize do |conn| | ||
conns << conn | ||
end | ||
end | ||
end.map(&:join) | ||
end | ||
end | ||
|
||
c = @db.synchronize{|conn| conn} | ||
conns.size.must_equal 4 | ||
conns.each do |conn| | ||
conn.must_be_same_as c | ||
end | ||
|
||
@db.sqls.must_equal ['BEGIN', 'ROLLBACK'] | ||
end | ||
|
||
it "should temporarily release connection for specific shard during block so it can be acquired by other threads" do | ||
conns = [] | ||
@db.transaction(:rollback=>:always, :server=>:foo) do |c| | ||
@db.temporarily_release_connection(c, :foo) do | ||
@db.transaction(:rollback=>:always, :server=>:bar) do |c2| | ||
@db.temporarily_release_connection(c2, :bar) do | ||
4.times.map do |i| | ||
Thread.new do | ||
@db.synchronize(:foo) do |conn| | ||
@db.synchronize(:bar) do |conn2| | ||
conns << [conn, conn2] | ||
end | ||
end | ||
end | ||
end.map(&:join) | ||
end | ||
end | ||
end | ||
end | ||
|
||
c = @db.synchronize(:foo){|conn| conn} | ||
c2 = @db.synchronize(:bar){|conn| conn} | ||
conns.size.must_equal 4 | ||
conns.each do |conn, conn2| | ||
conn.must_be_same_as c | ||
conn2.must_be_same_as c2 | ||
end | ||
|
||
@db.sqls.must_equal ["BEGIN -- foo", "BEGIN -- bar", "ROLLBACK -- bar", "ROLLBACK -- foo"] | ||
end if pool_type.to_s.start_with?('sharded') | ||
|
||
it "should raise UnableToReacquireConnectionError if unable to reacquire the same connection it released" do | ||
proc do | ||
@db.transaction(rollback: :always) do |conn| | ||
@db.temporarily_release_connection(conn) do | ||
@db.disconnect | ||
end | ||
end | ||
end.must_raise Sequel::UnableToReacquireConnectionError | ||
@db.sqls.must_equal ['BEGIN'] | ||
end | ||
|
||
it "should raise if provided a connection that is not checked out" do | ||
proc do | ||
@db.temporarily_release_connection(@db.synchronize{|conn| conn}) | ||
end.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise if pool max_size is not 1" do | ||
db = Sequel.mock(:pool_type=>pool_type) | ||
proc do | ||
db.extension(:temporarily_release_connection) | ||
end.must_raise Sequel::Error | ||
end | ||
end | ||
end | ||
|
||
describe "temporarily_release_connection extension" do | ||
it "should raise if pool uses connection_handling: :disconnect option" do | ||
db = Sequel.mock(:connection_handling=>:disconnect) | ||
proc do | ||
db.extension(:temporarily_release_connection) | ||
end.must_raise Sequel::Error | ||
end | ||
|
||
it "should raise if pool uses unsupported pool type" do | ||
db = Sequel.mock(:pool_class=>:single) | ||
proc do | ||
db.extension(:temporarily_release_connection) | ||
end.must_raise Sequel::Error | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters