Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription concurrency #134

Merged
merged 20 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
11473c6
Initial concurrent subscription test
slashdotdash Jun 26, 2018
0f99966
Send events to multiple subscribers connected to a single subscription
slashdotdash Jun 26, 2018
a6069af
WIP In-flight subscriber message acknowledgement
slashdotdash Jun 26, 2018
2bf3c6a
Subscription monitors subscribers, terminates when all subscribers down
slashdotdash Jun 27, 2018
dd9c57f
Exclude events filtered by selector function
slashdotdash Jun 27, 2018
3109505
Prevent too many subscribers connecting to a subscription
slashdotdash Jun 27, 2018
0c88470
Subscriber buffer size
slashdotdash Jun 28, 2018
2e5cdd6
Ensure subscription last sent is initialised from last seen
slashdotdash Jun 28, 2018
3b93f2e
Subscription event partitioning
slashdotdash Jun 29, 2018
f087937
Attempt to notify partitioned events ordered by lowest event number
slashdotdash Jun 29, 2018
0cd0ffa
Subscription records `last_sent` for filtered events
slashdotdash Sep 1, 2018
e0e80a7
Request next batch of events when catching up and queue is empty
slashdotdash Sep 2, 2018
a6db11d
Extract subscription tests to test case
slashdotdash Sep 3, 2018
a06bc3e
Delete subscription
slashdotdash Sep 17, 2018
ace4451
Unsubscribe from stream and delete subscription tests
slashdotdash Sep 17, 2018
d971424
Document concurrent subscriptions
slashdotdash Sep 17, 2018
a019183
Acknowledgement of redistributed in-flight events after subscription …
slashdotdash Sep 17, 2018
1225cbf
Include Elixir v1.7.2 in Travis CI configuration
slashdotdash Sep 18, 2018
672c126
Disable logger backends for test environments
slashdotdash Sep 18, 2018
47accab
Fix failing tests on Travis CI
slashdotdash Sep 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial concurrent subscription test
  • Loading branch information
slashdotdash committed Sep 17, 2018
commit 11473c693d118865fefd32e02da05ceaeeb20580
11 changes: 9 additions & 2 deletions lib/event_store/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule EventStore.Subscriptions do
@moduledoc false

alias EventStore.Subscriptions
alias EventStore.Subscriptions.Subscription

@all_stream "$all"

Expand Down Expand Up @@ -35,8 +36,14 @@ defmodule EventStore.Subscriptions do
{:ok, subscription} ->
{:ok, subscription}

{:error, {:already_started, _subscription}} ->
{:error, :subscription_already_exists}
{:error, {:already_started, subscription}} ->
case Keyword.get(opts, :concurrency, 1) do
1 ->
{:error, :subscription_already_exists}

concurrency when is_number(concurrency) ->
Subscription.subscribe(subscription, subscriber, opts)
end
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ defmodule EventStore.Subscriptions.Subscription do
}, opts)
end

def subscribe(subscription, subscriber, subscription_opts) do
{:ok, subscription}
end

@doc """
Confirm receipt of an event by its event number.
"""
Expand Down
90 changes: 90 additions & 0 deletions test/subscriptions/concurrent_subscription_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
defmodule EventStore.Subscriptions.ConcurrentSubscriptionTest do
use EventStore.StorageCase

alias EventStore.EventFactory
alias EventStore.Subscriptions.Subscription

setup do
subscription_name = UUID.uuid4()

{:ok, %{subscription_name: subscription_name}}
end

describe "concurrent subscription" do
test "should allow multiple subscribers", %{subscription_name: subscription_name} do
subscriber = self()

{:ok, subscription} =
EventStore.subscribe_to_all_streams(subscription_name, subscriber, concurrency: 2)

{:ok, ^subscription} =
EventStore.subscribe_to_all_streams(subscription_name, subscriber, concurrency: 2)

assert_receive {:subscribed, ^subscription}
refute_receive {:subscribed, ^subscription}
end

@tag :wip
test "should send events to all subscribers", %{subscription_name: subscription_name} do
stream_uuid = UUID.uuid4()

subscriber1 = start_subscriber(:subscriber1)
subscriber2 = start_subscriber(:subscriber2)
subscriber3 = start_subscriber(:subscriber3)

{:ok, subscription} =
EventStore.subscribe_to_all_streams(subscription_name, subscriber1, concurrency: 3)

{:ok, ^subscription} =
EventStore.subscribe_to_all_streams(subscription_name, subscriber2, concurrency: 3)

{:ok, ^subscription} =
EventStore.subscribe_to_all_streams(subscription_name, subscriber3, concurrency: 3)

append_to_stream(stream_uuid, 3)

assert_receive {:events, received_events, :subscriber1}
assert_receive {:events, received_events, :subscriber2}
assert_receive {:events, received_events, :subscriber3}
end
end

defp start_subscriber(name) do
reply_to = self()

spawn_link(fn ->
receive_events = fn loop ->
receive do
{:events, events} ->
send(reply_to, {:events, events, name})

loop.(loop)
end
end

receive_events.(receive_events)
end)
end

def receive_and_ack(subscription, expected_stream_uuid) do
assert_receive {:events, received_events}
assert Enum.all?(received_events, fn event -> event.stream_uuid == expected_stream_uuid end)

Subscription.ack(subscription, received_events)
end

defp append_to_stream(stream_uuid, event_count) do
events = EventFactory.create_events(event_count)

:ok = EventStore.append_to_stream(stream_uuid, 0, events)
end

# Subscribe to all streams and wait for the subscription to be subscribed.
defp subscribe_to_all_streams(subscription_name, subscriber, opts) do
{:ok, subscription} = EventStore.subscribe_to_all_streams(subscription_name, subscriber, opts)

assert_receive {:subscribed, ^subscription}

{:ok, subscription}
end
end