Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow DB::Pool to be used a generic connection pool #131

Merged
merged 7 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions spec/http_client_pool_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require "./spec_helper"
require "./support/http"

describe DB::Pool do
it "distributes evenly the requests" do
mutex = Mutex.new
requests_per_connection = Hash(Socket::Address, Int32).new

server = HTTP::Server.new do |context|
remote_address = context.request.remote_address.not_nil!
mutex.synchronize do
requests_per_connection[remote_address] ||= 0
requests_per_connection[remote_address] += 1
end
sleep context.request.query_params["delay"].to_f
context.response.print "ok"
end
address = server.bind_unused_port "127.0.0.1"

run_server(server) do
fixed_pool_size = 5
expected_per_connection = 5
requests = fixed_pool_size * expected_per_connection

pool = DB::Pool.new(
initial_pool_size: fixed_pool_size,
max_pool_size: fixed_pool_size,
max_idle_pool_size: fixed_pool_size) {
HTTP::Client.new(URI.parse("http://127.0.0.1:#{address.port}/"))
}

done = Channel(Nil).new

requests.times do
spawn do
pool.checkout do |http|
http.get("/?delay=0.1")
end
done.send(nil)
end
end

spawn do
requests.times { done.receive }
done.close
end
wait_for { done.closed? }

requests_per_connection.values.should eq([expected_per_connection] * fixed_pool_size)
end
end
end
16 changes: 16 additions & 0 deletions spec/support/fibers.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def wait_until_blocked(f : Fiber, timeout = 5.seconds)
now = Time.monotonic

until f.resumable?
Fiber.yield
raise "fiber failed to block within #{timeout}" if (Time.monotonic - now) > timeout
end
end

def wait_until_finished(f : Fiber, timeout = 5.seconds)
now = Time.monotonic
until f.dead?
Fiber.yield
raise "fiber failed to finish within #{timeout}" if (Time.monotonic - now) > timeout
end
end
48 changes: 48 additions & 0 deletions spec/support/http.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require "http"
require "./fibers"

def wait_for(timeout = 5.seconds)
now = Time.monotonic

until yield
Fiber.yield

if (Time.monotonic - now) > timeout
raise "block failed to evaluate to true within #{timeout}"
end
end
end

# Helper method which runs *server*
# 1. Spawns `server.listen` in a new fiber.
# 2. Waits until `server.listening?`.
# 3. Yields to the given block.
# 4. Ensures the server is closed.
# 5. After returning from the block, it waits for the server to gracefully
# shut down before continuing execution in the current fiber.
# 6. If the listening fiber raises an exception, it is rescued and re-raised
# in the current fiber.
def run_server(server)
server_done = Channel(Exception?).new

f = spawn do
server.listen
rescue exc
server_done.send exc
else
server_done.send nil
end

begin
wait_for { server.listening? }
wait_until_blocked f

yield server_done
ensure
server.close unless server.closed?

if exc = server_done.receive
raise exc
end
end
end
22 changes: 17 additions & 5 deletions src/db/error.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module DB
abstract class Connection
end

class Error < Exception
end

Expand All @@ -11,20 +14,29 @@ module DB
class PoolRetryAttemptsExceeded < Error
end

class PoolResourceLost(T) < Error
getter resource : T

def initialize(@resource : T)
end
end

class PoolResourceRefused < Error
end

# Raised when an established connection is lost
# probably due to socket/network issues.
# It is used by the connection pool retry logic.
class ConnectionLost < Error
getter connection : Connection

def initialize(@connection)
class ConnectionLost < PoolResourceLost(Connection)
def connection
resource
end
end

# Raised when a connection is unable to be established
# probably due to socket/network or configuration issues.
# It is used by the connection pool retry logic.
class ConnectionRefused < Error
class ConnectionRefused < PoolResourceRefused
end

class Rollback < Error
Expand Down
43 changes: 33 additions & 10 deletions src/db/pool.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "weak_ref"

require "./error"

module DB
class Pool(T)
# Pool configuration
Expand Down Expand Up @@ -52,12 +54,19 @@ module DB
@idle.clear
end

record Stats, open_connections : Int32
record Stats,
jgaskins marked this conversation as resolved.
Show resolved Hide resolved
open_connections : Int32,
idle_connections : Int32,
in_flight_connections : Int32,
max_connections : Int32

# Returns stats of the pool
def stats
Stats.new(
open_connections: @total.size
open_connections: @total.size,
idle_connections: @idle.size,
in_flight_connections: @inflight,
max_connections: @max_pool_size,
)
end

Expand Down Expand Up @@ -91,10 +100,22 @@ module DB
resource
end

res.before_checkout
if res.responds_to?(:before_checkout)
res.before_checkout
end
res
end

def checkout(&block : T ->)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. But the new Pool#checkout(&) might not play nice with DB connections. It's not documented, but the auto_release property handles the logic of how the resource is released after the operation is completed. This requires some knowledge of the resource though. Maybe we need to formalise that a bit better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I haven't looked into how that works but it sounds like it might fall under DB-specific extensions that could be made if this class were extracted to its own shard, which DB::Pool could then inherit from.

connection = checkout

begin
yield connection
ensure
release connection
end
end

# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
Expand Down Expand Up @@ -122,7 +143,9 @@ module DB
sync do
if can_increase_idle_pool
@idle << resource
resource.after_release
if resource.responds_to?(:after_release)
resource.after_release
end
idle_pushed = true
else
resource.close
Expand Down Expand Up @@ -153,12 +176,12 @@ module DB
begin
sleep @retry_delay if i >= current_available
return yield
rescue e : ConnectionLost
rescue e : PoolResourceLost(T)
# if the connection is lost close it to release resources
# and remove it from the known pool.
sync { delete(e.connection) }
e.connection.close
rescue e : ConnectionRefused
sync { delete(e.resource) }
e.resource.close
rescue e : PoolResourceRefused
# a ConnectionRefused means a new connection
# was intended to be created
# nothing to due but to retry soon
Expand Down Expand Up @@ -215,7 +238,7 @@ module DB
sync_dec_waiting_resource
when timeout(@checkout_timeout.seconds)
sync_dec_waiting_resource
raise DB::PoolTimeout.new
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
end
end
{% else %}
Expand All @@ -232,7 +255,7 @@ module DB
sync_dec_waiting_resource
when 1
sync_dec_waiting_resource
raise DB::PoolTimeout.new
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds")
else
raise DB::Error.new
end
Expand Down