Skip to content

Commit

Permalink
Introduce Plugin behaviour and expand plugin option validation (oban-…
Browse files Browse the repository at this point in the history
…bg#659)

A new `Oban.Plugin` behavior simplifies and unifies the process of testing plugins.

Co-authored-by: Milton Mazzarri <me@milmazz.uno>
  • Loading branch information
sorentwo and milmazz committed Mar 7, 2022
1 parent c0b5e9f commit e337bde
Show file tree
Hide file tree
Showing 18 changed files with 567 additions and 214 deletions.
17 changes: 17 additions & 0 deletions guides/writing_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,33 @@ useful plugin, but it demonstrates how to handle options, work with the

```elixir
defmodule MyApp.Plugins.Breakdown do
@behaviour Oban.Plugin

use GenServer

import Ecto.Query, only: [group_by: 3, select: 3]

@impl Oban.Plugin
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)

GenServer.start_link(__MODULE__, opts, name: name)
end

@impl Oban.Plugin
def validate(opts) do
Oban.Plugin.valdate(opts,
{:conf, _} -> :ok
{:name, _} -> :ok
{:interval, _} ->
if is_integer(interval) do
:ok
else
{:error, "expected interval to be an integer"}
end
end)
end

@impl GenServer
def init(opts) do
state = Map.new(opts)
Expand Down
7 changes: 6 additions & 1 deletion lib/oban/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,14 @@ defmodule Oban.Config do
raise ArgumentError, "plugin #{plugin} could not be found"
end

unless function_exported?(plugin, :validate, 1) do
raise ArgumentError,
"plugin #{plugin} is invalid because it's missing a `validate/1` function"
end

unless function_exported?(plugin, :init, 1) do
raise ArgumentError,
"plugin #{plugin} is not a valid plugin because it does not provide an `init/1` function"
"plugin #{plugin} is invalid because it's missing an `init/1` function"
end

unless Keyword.keyword?(opts) do
Expand Down
87 changes: 87 additions & 0 deletions lib/oban/plugin.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Oban.Plugin do
@moduledoc """
Defines a shared behaviour for Oban plugins.
In addition to implementing the Plugin behaviour, all plugins **must** be a `GenServer`, `Agent`, or
another OTP compliant module.
## Example
Defining a basic plugin that satisfies the minimum:
defmodule MyPlugin do
@behaviour Oban.Plugin
use GenServer
@impl Oban.Plugin
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
@impl Oban.Plugin
def validate(opts) do
if is_atom(opts[:mode])
:ok
else
{:error, "expected opts to have a :mode key"}
end
end
@impl GenServer
def init(opts) do
case validate(opts) do
:ok -> {:ok, opts}
{:error, reason} -> {:stop, reason}
end
end
end
"""

alias Oban.Config

@type option :: {:conf, Config.t()} | {:name, GenServer.name()} | {atom(), term()}
@type validator :: (option() -> :ok | {:error, term()})

@doc """
Starts a Plugin process linked to the current process.
Plugins are typically started as part of an Oban supervision tree and will receive the current
configuration as `:conf`, along with a `:name` and any other provided options.
"""
@callback start_link([option()]) :: GenServer.on_start()

@doc """
Validate the structure, presence, or values of keyword options.
"""
@callback validate([option()]) :: :ok | {:error, term()}

@doc """
A utility to help validate options without resorting to `throw` or `raise` for control flow.
## Example
Ensure all keys are known and the correct type:
validate(opts, fn
{:conf, conf} when is_struct(conf) -> :ok
{:name, name} when is_atom(name) -> :ok
opt -> {:error, "unknown option: " <> inspect(opt)}
end)
"""
@spec validate([option()], validator()) :: :ok | {:error, term()}
def validate(opts, validator) do
Enum.reduce_while(opts, :ok, fn opt, acc ->
case validator.(opt) do
:ok -> {:cont, acc}
{:error, _reason} = error -> {:halt, error}
end
end)
end

@doc false
@spec validate!([option()], ([option()] -> :ok | {:error, term()})) :: :ok
def validate!(opts, validate) do
with {:error, reason} <- validate.(opts), do: raise(ArgumentError, reason)
end
end
79 changes: 41 additions & 38 deletions lib/oban/plugins/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@ defmodule Oban.Plugins.Cron do
* :jobs - a list of jobs that were inserted into the database
"""

@behaviour Oban.Plugin

use GenServer

alias Oban.Cron.Expression
alias Oban.{Config, Job, Peer, Repo, Worker}
alias Oban.{Job, Peer, Plugin, Repo, Worker}

@opaque expression :: Expression.t()

@type cron_input :: {binary(), module()} | {binary(), module(), [Job.option()]}

@type option ::
{:conf, Config.t()}
| {:name, GenServer.name()}
Plugin.option()
| {:crontab, [cron_input()]}
| {:timezone, Calendar.time_zone()}

Expand All @@ -73,18 +74,15 @@ defmodule Oban.Plugins.Cron do
]
end

@doc false
@impl Plugin
@spec start_link([option()]) :: GenServer.on_start()
def start_link(opts) do
validate!(opts)

GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@doc false
@spec validate!(Keyword.t()) :: :ok
def validate!(opts) when is_list(opts) do
Enum.each(opts, &validate_opt!/1)
@impl Plugin
def validate(opts) when is_list(opts) do
Plugin.validate(opts, &validate_opt/1)
end

@doc """
Expand Down Expand Up @@ -145,6 +143,8 @@ defmodule Oban.Plugins.Cron do

@impl GenServer
def init(opts) do
Plugin.validate!(opts, &validate/1)

Process.flag(:trap_exit, true)

state =
Expand Down Expand Up @@ -211,50 +211,53 @@ defmodule Oban.Plugins.Cron do
%{state | crontab: parsed}
end

defp validate_opt!({:crontab, crontab}) do
unless is_list(crontab) do
raise ArgumentError, "expected :crontab to be a list, got: #{inspect(crontab)}"
end
defp validate_opt({:crontab, crontab}) when is_list(crontab) do
Plugin.validate(crontab, &validate_crontab/1)
end

Enum.each(crontab, &validate_crontab!/1)
defp validate_opt({:crontab, crontab}) do
{:error, "expected :crontab to be a list, got: #{inspect(crontab)}"}
end

defp validate_opt!({:timezone, timezone}) do
unless is_binary(timezone) and match?({:ok, _}, DateTime.now(timezone)) do
raise ArgumentError, "expected :timezone to be a known timezone"
defp validate_opt({:timezone, timezone}) do
if is_binary(timezone) and match?({:ok, _}, DateTime.now(timezone)) do
:ok
else
{:error, "expected :timezone to be a known timezone, got: #{inspect(timezone)}"}
end
end

defp validate_opt!(_opt), do: :ok
defp validate_opt(_opt), do: :ok

defp validate_crontab!({expression, worker, opts}) do
%Expression{} = Expression.parse!(expression)
defp validate_crontab({expression, worker, opts}) do
with {:ok, _} <- parse(expression) do
cond do
not Code.ensure_loaded?(worker) ->
{:error, "#{inspect(worker)} not found or can't be loaded"}

unless Code.ensure_loaded?(worker) do
raise ArgumentError, "#{inspect(worker)} not found or can't be loaded"
end
not function_exported?(worker, :perform, 1) ->
{:error, "#{inspect(worker)} does not implement `perform/1` callback"}

unless function_exported?(worker, :perform, 1) do
raise ArgumentError, "#{inspect(worker)} does not implement `perform/1` callback"
end
not Keyword.keyword?(opts) ->
{:error, "options must be a keyword list, got: #{inspect(opts)}"}

unless Keyword.keyword?(opts) do
raise ArgumentError, "options must be a keyword list, got: #{inspect(opts)}"
end
not build_changeset(worker, opts).valid? ->
{:error, "expected valid job options, got: #{inspect(opts)}"}

unless build_changeset(worker, opts).valid? do
raise ArgumentError, "expected valid job options, got: #{inspect(opts)}"
true ->
:ok
end
end
end

defp validate_crontab!({expression, worker}) do
validate_crontab!({expression, worker, []})
defp validate_crontab({expression, worker}) do
validate_crontab({expression, worker, []})
end

defp validate_crontab!(invalid) do
raise ArgumentError,
"expected crontab entry to be an {expression, worker} or " <>
"{expression, worker, options} tuple, got: #{inspect(invalid)}"
defp validate_crontab(invalid) do
{:error,
"expected crontab entry to be an {expression, worker} or " <>
"{expression, worker, options} tuple, got: #{inspect(invalid)}"}
end

# Inserting Helpers
Expand Down
34 changes: 31 additions & 3 deletions lib/oban/plugins/gossip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,40 @@ defmodule Oban.Plugins.Gossip do
* `:gossip_count` - the number of queues that had activity broadcasted
"""

@behaviour Oban.Plugin

use GenServer

alias Oban.{Config, Notifier}
alias Oban.{Notifier, Plugin}

@type option :: {:conf, Config.t()} | {:name, GenServer.name()} | {:interval, pos_integer()}
@type option :: Plugin.option() | {:interval, pos_integer()}

defmodule State do
@moduledoc false

defstruct [:conf, :name, :timer, interval: :timer.seconds(1)]
end

@doc false
@impl Plugin
@spec start_link([option()]) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@impl Plugin
def validate(opts) do
Plugin.validate(opts, fn
{:conf, _} -> :ok
{:name, _} -> :ok
{:interval, interval} -> validate_integer(:interval, interval)
option -> {:error, "unknown option provided: #{inspect(option)}"}
end)
end

@impl GenServer
def init(opts) do
Plugin.validate!(opts, &validate/1)

Process.flag(:trap_exit, true)

state =
Expand Down Expand Up @@ -98,10 +112,24 @@ defmodule Oban.Plugins.Gossip do
{:noreply, state}
end

# Validation

defp validate_integer(key, value) do
if is_integer(value) and value > 0 do
:ok
else
{:error, "expected #{inspect(key)} to be a positive integer, got: #{inspect(value)}"}
end
end

# Scheduling

defp schedule_gossip(state) do
%{state | timer: Process.send_after(self(), :gossip, state.interval)}
end

# Checking

defp safe_check(pid, state) do
if Process.alive?(pid), do: GenServer.call(pid, :check, state.interval)
catch
Expand Down
Loading

0 comments on commit e337bde

Please sign in to comment.