Skip to content

Commit

Permalink
Stash a timed job's pid to enable inner messaging
Browse files Browse the repository at this point in the history
Without explicitly stashing a timed task's pid there isn't any way to
find or reference it from outside the task, i.e. from a telemetry event
handler. This stashes the pid in the process dictionary using a key
format similar to the :"$callers" or :"$ancestors" used by Task itself.
  • Loading branch information
sorentwo committed Feb 26, 2021
1 parent c7ce8cb commit 5df12c8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
3 changes: 3 additions & 0 deletions lib/oban/queue/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ defmodule Oban.Queue.Executor do
defp perform_timed(exec, timeout) do
task = Task.async(fn -> perform_inline(exec) end)

# Stash the timed task's pid to facilitate sending it messages via telemetry events.
Process.put(:"$nested", [task.pid])

case Task.yield(task, timeout) || Task.shutdown(task) do
{:ok, reply} ->
reply
Expand Down
10 changes: 5 additions & 5 deletions lib/oban/queue/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ defmodule Oban.Queue.Producer do
end

@impl GenServer
def handle_info({ref, _val}, %State{running: running} = state) do
def handle_info({ref, _val}, %State{running: running} = state) when is_reference(ref) do
Process.demonitor(ref, [:flush])

schedule_dispatch(%{state | running: Map.delete(running, ref)})
Expand Down Expand Up @@ -102,10 +102,6 @@ defmodule Oban.Queue.Producer do
schedule_dispatch(%{state | running: running})
end

def handle_info(:dispatch, %State{} = state) do
{:noreply, dispatch(%{state | timer: nil})}
end

def handle_info({:notification, :insert, %{"queue" => queue}}, %State{queue: queue} = state) do
schedule_dispatch(state)
end
Expand Down Expand Up @@ -136,6 +132,10 @@ defmodule Oban.Queue.Producer do
schedule_dispatch(state)
end

def handle_info(:dispatch, %State{} = state) do
{:noreply, dispatch(%{state | timer: nil})}
end

def handle_info(:reset_circuit, state) do
{:noreply, open_circuit(state)}
end
Expand Down
2 changes: 1 addition & 1 deletion test/integration/timeouts_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Oban.Integration.TimeoutsTest do

@moduletag :integration

test "jobs that exceed the worker's timeout are failed" do
test "jobs that exceed the worker's timeout fail" do
start_supervised_oban!(queues: [alpha: 5])

job = insert!(ref: 1, sleep: 100, timeout: 20)
Expand Down
14 changes: 14 additions & 0 deletions test/oban/queue/executor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ defmodule Oban.Queue.ExecutorTest do
def perform(%{args: %{"mode" => "catch"}}), do: throw(:no_reason)
def perform(%{args: %{"mode" => "error"}}), do: {:error, "no reason"}
def perform(%{args: %{"mode" => "sleep"}}), do: Process.sleep(10)
def perform(%{args: %{"mode" => "timed"}}), do: Process.sleep(10)

@impl Worker
def timeout(%{args: %{"mode" => "timed"}}), do: 20
def timeout(_), do: :infinity
end

@conf Config.new(repo: Repo)
Expand Down Expand Up @@ -72,6 +77,15 @@ defmodule Oban.Queue.ExecutorTest do
assert_in_delta duration_ms, 10, 20
assert_in_delta queue_time_ms, 30, 20
end

test "tracking the pid of nested timed tasks" do
assert %{state: :success, result: :ok} = call_with_mode("timed")

assert [task_pid] = Process.get(:"$nested")

assert is_pid(task_pid)
assert task_pid != self()
end
end

defp call_with_mode(mode) do
Expand Down

0 comments on commit 5df12c8

Please sign in to comment.