Skip to content

Commit

Permalink
Transient subscription event mapping function
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Feb 23, 2018
1 parent 5a73f7c commit 5de99ce
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Subscription notification message once successfully subscribed ([#104](https://github.com/commanded/eventstore/pull/104)).
- Transient subscriptions ([#105](https://github.com/commanded/eventstore/pull/105)).
- Use a single PostgreSQL connection for all subscriptions ([#106](https://github.com/commanded/eventstore/pull/106)).
- Transient subscription event mapping function ([#108](https://github.com/commanded/eventstore/pull/108)).

## v0.13.2

Expand Down
2 changes: 1 addition & 1 deletion guides/Cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ EventStore supports running on multiple nodes as either a [distributed Erlang](h

PostgreSQL's `LISTEN` / `NOTIFY` is used to pub/sub event notifications.

A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast to all connected nodes using Erlang's [pg2](http://erlang.org/doc/man/pg2.html) process groups. This limits the number of database connections to at most the number of running clusters.
A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast from the single listener process to a `GenServer` process running on each connected node that forwards events to its local subscribers. This limits the number of database connections to at most the number of running clusters.

Running EventStore on multiple nodes that are not connected together to form a cluster will result in one listener process and database connection per node.

Expand Down
16 changes: 16 additions & 0 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ receive do
end
```

#### Mapping events

You can provide an event mapping function that maps each `RecordedEvent` before sending it to the subscriber:

```elixir
EventStore.subscribe(stream_uuid, mapper: fn
%EventStore.RecordedEvent{data: data} -> data
end)

# receive first batch of mapped event data
receive do
{:events, event_data} ->
IO.puts "Received event data: " <> inspect(event_data)
end
```

## Persistent subscriptions

Persistent subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. The last received and acknowledged event is stored by the EventStore to support resuming at a later time later or whenever the subscriber process restarts.
Expand Down
11 changes: 9 additions & 2 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ defmodule EventStore do
- `stream_uuid` is the stream to subscribe to.
Use the `$all` identifier to subscribe to events from all streams.
- `opts` is an optional map providing additional subscription configuration:
- `mapper` to define a function to map each recorded event before sending
to the subscriber.
The calling process will be notified whenever new events are appended to
the given `stream_uuid`.
Expand All @@ -315,8 +319,11 @@ defmodule EventStore do
end
"""
@spec subscribe(String.t()) :: :ok | {:error, term}
def subscribe(stream_uuid), do: Registration.subscribe(stream_uuid)
@spec subscribe(String.t(), mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}

def subscribe(stream_uuid, opts \\ [])

def subscribe(stream_uuid, opts), do: Registration.subscribe(stream_uuid, opts)

@doc """
Create a persistent subscription to a single stream.
Expand Down
30 changes: 30 additions & 0 deletions lib/event_store/registration/distributed_forwarder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule EventStore.Registration.DistributedForwarder do
use GenServer

alias EventStore.Registration.LocalRegistry

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end

@doc """
Broadcast the message on the topic to all connected nodes.
"""
def broadcast(topic, message) do
for node <- Node.list() do
send({__MODULE__, node}, {topic, message})
end

:ok
end

def init(_args) do
{:ok, []}
end

def handle_info({topic, message}, state) do
LocalRegistry.broadcast(topic, message)

{:noreply, state}
end
end
43 changes: 43 additions & 0 deletions lib/event_store/registration/distributed_registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule EventStore.Registration.DistributedRegistry do
@moduledoc """
Pub/sub using a local registry and broadcasting messages to all connected
nodes.
"""

@behaviour EventStore.Registration

require Logger

alias EventStore.Registration.{DistributedForwarder, LocalRegistry}

@doc """
Return an optional supervisor spec for the registry.
"""
@spec child_spec() :: [:supervisor.child_spec()]
@impl EventStore.Registration
def child_spec do
LocalRegistry.child_spec() ++
[
DistributedForwarder.child_spec([])
]
end

@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@impl EventStore.Registration
def subscribe(topic, opts) do
LocalRegistry.subscribe(topic, opts)
end

@doc """
Broadcasts message on given topic.
"""
@spec broadcast(binary, term) :: :ok | {:error, term}
@impl EventStore.Registration
def broadcast(topic, message) do
:ok = LocalRegistry.broadcast(topic, message)
:ok = DistributedForwarder.broadcast(topic, message)
end
end
33 changes: 14 additions & 19 deletions lib/event_store/registration/local_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,32 @@ defmodule EventStore.Registration.LocalRegistry do
@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary) :: :ok | {:error, term}
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
@impl EventStore.Registration
def subscribe(topic) do
with {:ok, _} <- Registry.register(EventStore.PubSub, topic, []) do
def subscribe(topic, opts) do
with {:ok, _} <- Registry.register(EventStore.PubSub, topic, opts) do
:ok
end
end

@doc """
Is the caller subscribed to the given topic?
"""
@spec subscribed?(binary) :: true | false
@impl EventStore.Registration
def subscribed?(topic) do
subscriptions = Registry.lookup(EventStore.PubSub, topic)
caller = self()

Enum.any?(subscriptions, fn
{^caller, _} -> true
_ -> false
end)
end

@doc """
Broadcasts message on given topic.
"""
@spec broadcast(binary, term) :: :ok | {:error, term}
@impl EventStore.Registration
def broadcast(topic, message) do
Registry.dispatch(EventStore.PubSub, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, message)
for {pid, opts} <- entries do
notify_subscriber(pid, message, opts)
end
end)
end

defp notify_subscriber(pid, {:events, events}, mapper: mapper) when is_function(mapper, 1) do
send(pid, {:events, Enum.map(events, mapper)})
end

defp notify_subscriber(pid, message, _opts) do
send(pid, message)
end
end
78 changes: 0 additions & 78 deletions lib/event_store/registration/pg2_registry.ex

This file was deleted.

23 changes: 7 additions & 16 deletions lib/event_store/registration/registration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule EventStore.Registration do
Registration specification for EventStore pub/sub.
"""

alias EventStore.Registration.{DistributedRegistry, LocalRegistry}

@doc """
Return an optional supervisor spec for the registry.
"""
Expand All @@ -11,12 +13,7 @@ defmodule EventStore.Registration do
@doc """
Subscribes the caller to the given topic.
"""
@callback subscribe(binary) :: :ok | {:error, term}

@doc """
Is the caller subscribed to the given topic?
"""
@callback subscribed?(binary) :: true | false
@callback subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}

@doc """
Broadcasts message on given topic.
Expand All @@ -32,14 +29,8 @@ defmodule EventStore.Registration do
@doc """
Subscribes the caller to the given topic.
"""
@spec subscribe(binary) :: :ok | {:error, term}
def subscribe(topic), do: registry_provider().subscribe(topic)

@doc """
Is the caller subscribed to the given topic?
"""
@spec subscribed?(binary) :: true | false
def subscribed?(topic), do: registry_provider().subscribed?(topic)
@spec subscribe(binary, mapper: (RecordedEvent.t() -> any())) :: :ok | {:error, term}
def subscribe(topic, opts \\ []), do: registry_provider().subscribe(topic, opts)

@doc """
Broadcasts message on given topic.
Expand All @@ -53,10 +44,10 @@ defmodule EventStore.Registration do
def registry_provider do
case Application.get_env(:eventstore, :registry, :local) do
:local ->
EventStore.Registration.LocalRegistry
LocalRegistry

:distributed ->
EventStore.Registration.PG2Registry
DistributedRegistry

unknown ->
raise ArgumentError, message: "Unknown `:registry` setting in config: #{inspect(unknown)}"
Expand Down
13 changes: 13 additions & 0 deletions test/event_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,19 @@ defmodule EventStoreTest do
assert_receive {:events, received_events}
assert length(received_events) == 2
end

test "should map events using optional `mapper` function" do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(1)

assert :ok = EventStore.subscribe(stream_uuid, mapper: fn
%RecordedEvent{event_number: event_number} -> event_number
end)

:ok = EventStore.append_to_stream(stream_uuid, 0, events)

assert_receive {:events, [1]}
end
end

describe "persistent subscription" do
Expand Down

0 comments on commit 5de99ce

Please sign in to comment.