Skip to content

Commit

Permalink
Allow DB::Pool to be used a generic connection pool (#131)
Browse files Browse the repository at this point in the history
* Allow DB::Pool to be a generic connection pool

* Use fully qualified class name for consistency

Co-authored-by: Brian J. Cardiff <bcardiff@gmail.com>

* Wrap only the necessary code in an `ensure`

* Add spec for http client pool

* Fix ICE in crystal-sqlite3

Co-authored-by: Brian J. Cardiff <bcardiff@gmail.com>
  • Loading branch information
jgaskins and bcardiff authored Sep 14, 2020
1 parent ed686ad commit 291b65b
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 15 deletions.
52 changes: 52 additions & 0 deletions spec/http_client_pool_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require "./spec_helper"
require "./support/http"

describe DB::Pool do
it "distributes evenly the requests" do
mutex = Mutex.new
requests_per_connection = Hash(Socket::Address, Int32).new

server = HTTP::Server.new do |context|
remote_address = context.request.remote_address.not_nil!
mutex.synchronize do
requests_per_connection[remote_address] ||= 0
requests_per_connection[remote_address] += 1
end
sleep context.request.query_params["delay"].to_f
context.response.print "ok"
end
address = server.bind_unused_port "127.0.0.1"

run_server(server) do
fixed_pool_size = 5
expected_per_connection = 5
requests = fixed_pool_size * expected_per_connection

pool = DB::Pool.new(
initial_pool_size: fixed_pool_size,
max_pool_size: fixed_pool_size,
max_idle_pool_size: fixed_pool_size) {
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
}

done = Channel(Nil).new

requests.times do
spawn do
pool.checkout do |http|
http.get("/?delay=0.1")
end
done.send(nil)
end
end

spawn do
requests.times { done.receive }
done.close
end
wait_for { done.closed? }

requests_per_connection.values.should eq([expected_per_connection] * fixed_pool_size)
end
end
end
16 changes: 16 additions & 0 deletions spec/support/fibers.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def wait_until_blocked(f : Fiber, timeout = 5.seconds)
now = Time.monotonic

until f.resumable?
Fiber.yield
raise "fiber failed to block within #{timeout}" if (Time.monotonic - now) > timeout
end
end

def wait_until_finished(f : Fiber, timeout = 5.seconds)
now = Time.monotonic
until f.dead?
Fiber.yield
raise "fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout
end
end
48 changes: 48 additions & 0 deletions spec/support/http.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require "http"
require "./fibers"

def wait_for(timeout = 5.seconds)
now = Time.monotonic

until yield
Fiber.yield

if (Time.monotonic - now) > timeout
raise "block failed to evaluate to true within #{timeout}"
end
end
end

# Helper method which runs *server*
# 1. Spawns `server.listen` in a new fiber.
# 2. Waits until `server.listening?`.
# 3. Yields to the given block.
# 4. Ensures the server is closed.
# 5. After returning from the block, it waits for the server to gracefully
# shut down before continuing execution in the current fiber.
# 6. If the listening fiber raises an exception, it is rescued and re-raised
# in the current fiber.
def run_server(server)
server_done = Channel(Exception?).new

f = spawn do
server.listen
rescue exc
server_done.send exc
else
server_done.send nil
end

begin
wait_for { server.listening? }
wait_until_blocked f

yield server_done
ensure
server.close unless server.closed?

if exc = server_done.receive
raise exc
end
end
end
22 changes: 17 additions & 5 deletions src/db/error.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module DB
abstract class Connection
end

class Error < Exception
end

Expand All @@ -11,20 +14,29 @@ module DB
class PoolRetryAttemptsExceeded < Error
end

class PoolResourceLost(T) < Error
getter resource : T

def initialize(@resource : T)
end
end

class PoolResourceRefused < Error
end

# Raised when an established connection is lost
# probably due to socket/network issues.
# It is used by the connection pool retry logic.
class ConnectionLost < Error
getter connection : Connection

def initialize(@connection)
class ConnectionLost < PoolResourceLost(Connection)
def connection
resource
end
end

# Raised when a connection is unable to be established
# probably due to socket/network or configuration issues.
# It is used by the connection pool retry logic.
class ConnectionRefused < Error
class ConnectionRefused < PoolResourceRefused
end

class Rollback < Error
Expand Down
43 changes: 33 additions & 10 deletions src/db/pool.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "weak_ref"

require "./error"

module DB
class Pool(T)
# Pool configuration
Expand Down Expand Up @@ -52,12 +54,19 @@ module DB
@idle.clear
end

record Stats, open_connections : Int32
record Stats,
open_connections : Int32,
idle_connections : Int32,
in_flight_connections : Int32,
max_connections : Int32

# Returns stats of the pool
def stats
Stats.new(
open_connections: @total.size
open_connections: @total.size,
idle_connections: @idle.size,
in_flight_connections: @inflight,
max_connections: @max_pool_size,
)
end

Expand Down Expand Up @@ -91,10 +100,22 @@ module DB
resource
end

res.before_checkout
if res.responds_to?(:before_checkout)
res.before_checkout
end
res
end

def checkout(&block : T ->)
connection = checkout

begin
yield connection
ensure
release connection
end
end

# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
Expand Down Expand Up @@ -122,7 +143,9 @@ module DB
sync do
if can_increase_idle_pool
@idle << resource
resource.after_release
if resource.responds_to?(:after_release)
resource.after_release
end
idle_pushed = true
else
resource.close
Expand Down Expand Up @@ -153,12 +176,12 @@ module DB
begin
sleep @retry_delay if i >= current_available
return yield
rescue e : ConnectionLost
rescue e : PoolResourceLost(T)
# if the connection is lost close it to release resources
# and remove it from the known pool.
sync { delete(e.connection) }
e.connection.close
rescue e : ConnectionRefused
sync { delete(e.resource) }
e.resource.close
rescue e : PoolResourceRefused
# a ConnectionRefused means a new connection
# was intended to be created
# nothing to due but to retry soon
Expand Down Expand Up @@ -215,7 +238,7 @@ module DB
sync_dec_waiting_resource
when timeout(@checkout_timeout.seconds)
sync_dec_waiting_resource
raise DB::PoolTimeout.new
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
end
end
{% else %}
Expand All @@ -232,7 +255,7 @@ module DB
sync_dec_waiting_resource
when 1
sync_dec_waiting_resource
raise DB::PoolTimeout.new
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
else
raise DB::Error.new
end
Expand Down

0 comments on commit 291b65b

Please sign in to comment.