-
-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow DB::Pool to be used a generic connection pool #131
Changes from 2 commits
2d1e892
259793a
034d818
1576600
d0050eb
a9aac7f
1b4fdc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
require "http" | ||
require "../src/db/pool" | ||
|
||
pool = DB::Pool.new { HTTP::Client.new(URI.parse("https://google.com")) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want Otherwise it will be too fragile when new options are coming. |
||
|
||
pool.checkout do |http| | ||
pp http.get("/") | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
require "../src/db/pool" | ||
require "uuid" | ||
|
||
class DummyIO < IO | ||
def read(slice : Bytes) | ||
sleep rand.seconds | ||
end | ||
|
||
def write(slice : Bytes) : Nil | ||
sleep rand.seconds # simulate I/O yielding the CPU | ||
STDOUT.puts "wrote #{String.new(slice)}" | ||
end | ||
end | ||
|
||
pool = DB::Pool.new { DummyIO.new } | ||
channel = Channel(Nil).new | ||
count = 10 | ||
|
||
count.times do | ||
spawn do | ||
pool.checkout do |io| | ||
io << "hello" | ||
end | ||
channel.send nil | ||
end | ||
end | ||
|
||
count.times { channel.receive } | ||
|
||
pool.close |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,10 +14,10 @@ module DB | |
# Raised when an established connection is lost | ||
# probably due to socket/network issues. | ||
# It is used by the connection pool retry logic. | ||
class ConnectionLost < Error | ||
getter connection : Connection | ||
class ConnectionLost(T) < Error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to future self: check if this is causing any breaking change on db drivers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked this against |
||
getter connection | ||
|
||
def initialize(@connection) | ||
def initialize(@connection : T) | ||
end | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
require "weak_ref" | ||
|
||
require "./error" | ||
|
||
module DB | ||
class Pool(T) | ||
# Pool configuration | ||
|
@@ -52,12 +54,19 @@ module DB | |
@idle.clear | ||
end | ||
|
||
record Stats, open_connections : Int32 | ||
record Stats, | ||
jgaskins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
open_connections : Int32, | ||
idle_connections : Int32, | ||
in_flight_connections : Int32, | ||
max_connections : Int32 | ||
|
||
# Returns stats of the pool | ||
def stats | ||
Stats.new( | ||
open_connections: @total.size | ||
open_connections: @total.size, | ||
idle_connections: @idle.size, | ||
in_flight_connections: @inflight, | ||
max_connections: @max_pool_size, | ||
) | ||
end | ||
|
||
|
@@ -91,10 +100,19 @@ module DB | |
resource | ||
end | ||
|
||
res.before_checkout | ||
if res.responds_to?(:before_checkout) | ||
res.before_checkout | ||
end | ||
res | ||
end | ||
|
||
def checkout(&block : T ->) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure. But the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting. I haven't looked into how that works but it sounds like it might fall under |
||
connection = checkout | ||
yield connection | ||
ensure | ||
release connection if connection | ||
jgaskins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
end | ||
|
||
# ``` | ||
# selected, is_candidate = pool.checkout_some(candidates) | ||
# ``` | ||
|
@@ -122,7 +140,9 @@ module DB | |
sync do | ||
if can_increase_idle_pool | ||
@idle << resource | ||
resource.after_release | ||
if resource.responds_to?(:after_release) | ||
resource.after_release | ||
end | ||
idle_pushed = true | ||
else | ||
resource.close | ||
|
@@ -215,7 +235,7 @@ module DB | |
sync_dec_waiting_resource | ||
when timeout(@checkout_timeout.seconds) | ||
sync_dec_waiting_resource | ||
raise DB::PoolTimeout.new | ||
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds") | ||
end | ||
end | ||
{% else %} | ||
|
@@ -232,7 +252,7 @@ module DB | |
sync_dec_waiting_resource | ||
when 1 | ||
sync_dec_waiting_resource | ||
raise DB::PoolTimeout.new | ||
raise DB::PoolTimeout.new("Could not check out a connection in #{@checkout_timeout} seconds") | ||
else | ||
raise DB::Error.new | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can create specs to ensure the pool is generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a feeling you might say that 🙂 I didn't know how to do it, though, so I figured I'd at least offer up what I had.
To clarify, I've always struggled with testing I/O. I don't know that we'd have to actually test I/O here, but that hadn't crossed my mind when I was writing this.