Skip to content

Commit

Permalink
Re-raise exceptions in parallel macro (crystal-lang#5726)
Browse files Browse the repository at this point in the history
  • Loading branch information
lipanski authored and RX14 committed Apr 6, 2018
1 parent d536c9c commit 7d64756
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 26 deletions.
73 changes: 50 additions & 23 deletions spec/std/concurrent_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,65 @@ private def method_named(expected_named)
Fiber.current.name.should eq(expected_named)
end

class SomeParallelJobException < Exception
end

private def raising_job : String
raise SomeParallelJobException.new("boom")
"result"
end

describe "concurrent" do
it "does four things concurrently" do
a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil)
a.should eq(3)
b.should eq(5)
c.should eq(4)
d.should be_nil
end
describe "parallel" do
it "does four things concurrently" do
a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil)
a.should eq(3)
b.should eq(5)
c.should eq(4)
d.should be_nil
end

it "uses spawn macro" do
chan = Channel(Int32).new
it "re-raises errors from Fibers as ConcurrentExecutionException" do
exception = expect_raises(ConcurrentExecutionException) do
a, b = parallel(raising_job, raising_job)
end

spawn method_with_named_args(chan)
chan.receive.should eq(3)
exception.cause.should be_a(SomeParallelJobException)
end

spawn method_with_named_args(chan, y: 20)
chan.receive.should eq(21)
it "is strict about the return value type" do
a, b = parallel(1 + 2, "hello")

spawn method_with_named_args(chan, x: 10, y: 20)
chan.receive.should eq(30)
typeof(a).should eq(Int32)
typeof(b).should eq(String)
end
end

it "spawns named" do
spawn(name: "sub") do
Fiber.current.name.should eq("sub")
describe "spawn" do
it "uses spawn macro" do
chan = Channel(Int32).new

spawn method_with_named_args(chan)
chan.receive.should eq(3)

spawn method_with_named_args(chan, y: 20)
chan.receive.should eq(21)

spawn method_with_named_args(chan, x: 10, y: 20)
chan.receive.should eq(30)
end
Fiber.yield
end

it "spawns named with macro" do
spawn method_named("foo"), name: "foo"
Fiber.yield
it "spawns named" do
spawn(name: "sub") do
Fiber.current.name.should eq("sub")
end
Fiber.yield
end

it "spawns named with macro" do
spawn method_named("foo"), name: "foo"
Fiber.yield
end
end

it "accepts method call with receiver" do
Expand Down
62 changes: 59 additions & 3 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,77 @@ macro spawn(call, *, name = nil)
{% end %}
end

# Wraps around exceptions re-raised from concurrent calls.
# The original exception can be accessed via `#cause`.
class ConcurrentExecutionException < Exception
end

# Runs the commands passed as arguments concurrently (in Fibers) and waits
# for them to finish.
#
# ```
# def say(word)
# puts word
# end
#
# # Will print out the three words concurrently
# parallel(
# say("concurrency"),
# say("is"),
# say("easy")
# )
# ```
#
# Can also be used to conveniently collect the return values of the
# concurrent operations.
#
# ```
# def concurrent_job(word)
# word
# end
#
# a, b, c =
# parallel(
# concurrent_job("concurrency"),
# concurrent_job("is"),
# concurrent_job("easy")
# )
#
# puts a # => "concurrency"
# puts b # => "is"
# puts c # => "easy"
# ```
#
# Due to the concurrent nature of this macro, it is highly recommended
# to handle any exceptions within the concurrent calls. Unhandled
# exceptions raised within the concurrent operations will be re-raised
# inside the parent fiber as `ConcurrentExecutionException`, with the
# `cause` attribute set to the original exception.
macro parallel(*jobs)
%channel = Channel(Nil).new
%channel = Channel(Exception | Nil).new

{% for job, i in jobs %}
%ret{i} = uninitialized typeof({{job}})
spawn do
begin
%ret{i} = {{job}}
ensure
rescue e : Exception
%channel.send e
else
%channel.send nil
end
end
{% end %}

{{ jobs.size }}.times { %channel.receive }
{{ jobs.size }}.times do
%value = %channel.receive
if %value.is_a?(Exception)
raise ConcurrentExecutionException.new(
"An unhandled error occured inside a `parallel` call",
cause: %value
)
end
end

{
{% for job, i in jobs %}
Expand Down

0 comments on commit 7d64756

Please sign in to comment.