diff --git a/config/config.exs b/config/config.exs index 917bd38c..df1f8654 100644 --- a/config/config.exs +++ b/config/config.exs @@ -5,10 +5,11 @@ config :elixir, :time_zone_database, Tzdata.TimeZoneDatabase config :logger, level: :warn config :oban, Oban.Test.Repo, + migration_lock: false, name: Oban.Test.Repo, + pool: Ecto.Adapters.SQL.Sandbox, priv: "test/support/", - url: System.get_env("DATABASE_URL") || "postgres://localhost:5432/oban_test", - pool: Ecto.Adapters.SQL.Sandbox + url: System.get_env("DATABASE_URL") || "postgres://localhost:5432/oban_test" config :oban, ecto_repos: [Oban.Test.Repo] diff --git a/test/integration/database_trigger_test.exs b/test/integration/database_trigger_test.exs index a303d8d6..f3cd47c6 100644 --- a/test/integration/database_trigger_test.exs +++ b/test/integration/database_trigger_test.exs @@ -1,11 +1,17 @@ defmodule Oban.Integration.DatabaseTriggerTest do use Oban.Case + alias Ecto.Adapters.SQL.Sandbox + test "dispatching jobs from a queue via database trigger" do - start_supervised_oban!(notifier: Oban.Notifiers.Postgres, queues: [alpha: 5]) + Sandbox.unboxed_run(Repo, fn -> + start_supervised_oban!(notifier: Oban.Notifiers.Postgres, queues: [alpha: 5]) + + insert!(ref: 1, action: "OK") - insert!(ref: 1, action: "OK") + assert_receive {:ok, 1} - assert_receive {:ok, 1} + delete_oban_data!() + end) end end diff --git a/test/integration/dynamic_repo_test.exs b/test/integration/dynamic_repo_test.exs index 5d6c63bd..7797f16d 100644 --- a/test/integration/dynamic_repo_test.exs +++ b/test/integration/dynamic_repo_test.exs @@ -1,58 +1,29 @@ defmodule Oban.Integration.DynamicRepoTest do use Oban.Case - import Ecto.Query - alias Oban.Test.DynamicRepo - setup do + test "executing jobs using a dynamic repo" do {:ok, repo_pid} = start_supervised({DynamicRepo, name: nil}) - DynamicRepo.put_dynamic_repo(repo_pid) DynamicRepo.put_dynamic_repo(nil) - on_exit(fn -> - DynamicRepo.put_dynamic_repo(repo_pid) - end) - - {:ok, repo_pid: repo_pid} - end - - test "job execution", context do - name = start_oban!(context.repo_pid, queues: [alpha: 1]) - job_ref = insert_job!(name).ref - - assert_receive {:ok, ^job_ref} - end + on_exit(fn -> DynamicRepo.put_dynamic_repo(repo_pid) end) - test "rollback job insertion after transaction failure", context do - name = start_oban!(context.repo_pid, queues: [alpha: 1]) + name = make_ref() - {:ok, _app_repo_pid} = - start_supervised(%{DynamicRepo.child_spec(name: :app_repo) | id: :app_repo}) + opts = [ + get_dynamic_repo: fn -> repo_pid end, + name: name, + peer: Oban.Case.Peer, + queues: [alpha: 1], + repo: DynamicRepo, + ] - DynamicRepo.put_dynamic_repo(:app_repo) + start_supervised!({Oban, opts}) - ref = System.unique_integer([:positive, :monotonic]) - - assert {:error, :failure, :wat, %{job: %Oban.Job{args: %{ref: ^ref, action: "OK"}}}} = - name - |> Oban.insert(Ecto.Multi.new(), :job, Worker.new(%{ref: ref, action: "OK"})) - |> Ecto.Multi.run(:failure, fn _repo, _ -> {:error, :wat} end) - |> DynamicRepo.transaction() - - refute_receive {:ok, ^ref} - end - - defp start_oban!(repo_pid, opts) do - opts - |> Keyword.merge(repo: DynamicRepo, get_dynamic_repo: fn -> repo_pid end) - |> start_supervised_oban!() - end + Oban.insert!(name, Worker.new(%{ref: 1, action: "OK"})) - defp insert_job!(oban_name) do - ref = System.unique_integer([:positive, :monotonic]) - {:ok, job} = Oban.insert(oban_name, Worker.new(%{ref: ref, action: "OK"})) - %{ref: ref, job: job} + assert_receive {:ok, 1} end end diff --git a/test/oban/migrations_test.exs b/test/oban/migrations_test.exs index feb78b11..0655964a 100644 --- a/test/oban/migrations_test.exs +++ b/test/oban/migrations_test.exs @@ -3,9 +3,17 @@ defmodule Oban.MigrationsTest do import Oban.Migrations, only: [initial_version: 0, current_version: 0, migrated_version: 2] + alias Oban.Test.MigrationRepo + @arbitrary_checks 20 - @moduletag :integration + @moduletag :migration + + setup do + start_supervised!(MigrationRepo) + + :ok + end defmodule StepMigration do use Ecto.Migration @@ -57,7 +65,7 @@ defmodule Oban.MigrationsTest do for up <- initial_version()..current_version() do Application.put_env(:oban, :up_version, up) - assert :ok = Ecto.Migrator.up(Repo, @base_version + up, StepMigration) + assert :ok = Ecto.Migrator.up(MigrationRepo, @base_version + up, StepMigration) assert migrated_version() == up end @@ -66,13 +74,13 @@ defmodule Oban.MigrationsTest do assert migrated_version() == current_version() Application.put_env(:oban, :down_version, 2) - assert :ok = Ecto.Migrator.down(Repo, @base_version + 2, StepMigration) + assert :ok = Ecto.Migrator.down(MigrationRepo, @base_version + 2, StepMigration) assert table_exists?("oban_jobs") assert migrated_version() == 1 Application.put_env(:oban, :down_version, 1) - assert :ok = Ecto.Migrator.down(Repo, @base_version + 1, StepMigration) + assert :ok = Ecto.Migrator.down(MigrationRepo, @base_version + 1, StepMigration) refute table_exists?("oban_jobs") refute table_exists?("oban_peers") @@ -81,19 +89,19 @@ defmodule Oban.MigrationsTest do end test "migrating up and down between default versions" do - assert :ok = Ecto.Migrator.up(Repo, @base_version, DefaultMigration) + assert :ok = Ecto.Migrator.up(MigrationRepo, @base_version, DefaultMigration) assert table_exists?("oban_jobs") assert migrated_version() == current_version() # Migrating once more to replicate multiple migrations that don't specify a version. - assert :ok = Ecto.Migrator.up(Repo, @base_version + 1, DefaultMigration) - assert :ok = Ecto.Migrator.down(Repo, @base_version + 1, DefaultMigration) + assert :ok = Ecto.Migrator.up(MigrationRepo, @base_version + 1, DefaultMigration) + assert :ok = Ecto.Migrator.down(MigrationRepo, @base_version + 1, DefaultMigration) refute table_exists?("oban_jobs") # Migrating once more to replicate multiple migrations that don't specify a version. - assert :ok = Ecto.Migrator.down(Repo, @base_version, DefaultMigration) + assert :ok = Ecto.Migrator.down(MigrationRepo, @base_version, DefaultMigration) after clear_migrated() end @@ -110,8 +118,8 @@ defmodule Oban.MigrationsTest do Application.put_env(:oban, :up_version, up) Application.put_env(:oban, :down_version, down) - assert :ok = Ecto.Migrator.up(Repo, @base_version, StepMigration) - assert :ok = Ecto.Migrator.down(Repo, @base_version, StepMigration) + assert :ok = Ecto.Migrator.up(MigrationRepo, @base_version, StepMigration) + assert :ok = Ecto.Migrator.down(MigrationRepo, @base_version, StepMigration) clear_migrated() end) @@ -119,7 +127,7 @@ defmodule Oban.MigrationsTest do test "skipping schema creation when schema doesn't exist" do assert_raise Postgrex.Error, fn -> - Ecto.Migrator.up(Repo, @base_version, DefaultMigrationNoSchemaCreation) + Ecto.Migrator.up(MigrationRepo, @base_version, DefaultMigrationNoSchemaCreation) end refute table_exists?("oban_jobs") @@ -129,9 +137,9 @@ defmodule Oban.MigrationsTest do end test "skipping schema creation when schema does exist" do - Repo.query!("CREATE SCHEMA IF NOT EXISTS migrating") + MigrationRepo.query!("CREATE SCHEMA IF NOT EXISTS migrating") - assert :ok = Ecto.Migrator.up(Repo, @base_version, DefaultMigrationNoSchemaCreation) + assert :ok = Ecto.Migrator.up(MigrationRepo, @base_version, DefaultMigrationNoSchemaCreation) assert table_exists?("oban_jobs") assert migrated_version() == current_version() @@ -140,7 +148,7 @@ defmodule Oban.MigrationsTest do end defp migrated_version do - migrated_version(Repo, "migrating") + migrated_version(MigrationRepo, "migrating") end defp table_exists?(table) do @@ -153,13 +161,13 @@ defmodule Oban.MigrationsTest do ) """ - {:ok, %{rows: [[bool]]}} = Repo.query(query) + {:ok, %{rows: [[bool]]}} = MigrationRepo.query(query) bool end defp clear_migrated do - Repo.query("DELETE FROM schema_migrations WHERE version >= #{@base_version}") - Repo.query("DROP SCHEMA IF EXISTS migrating CASCADE") + MigrationRepo.query("DELETE FROM schema_migrations WHERE version >= #{@base_version}") + MigrationRepo.query("DROP SCHEMA IF EXISTS migrating CASCADE") end end diff --git a/test/oban/notifier_test.exs b/test/oban/notifier_test.exs index 37af2d0d..ddc0459c 100644 --- a/test/oban/notifier_test.exs +++ b/test/oban/notifier_test.exs @@ -1,72 +1,75 @@ defmodule Oban.NotifierTest do use Oban.Case + alias Ecto.Adapters.SQL.Sandbox alias Oban.Notifier - @moduletag :integration - for notifier <- [Oban.Notifiers.PG, Oban.Notifiers.Postgres] do @notifier notifier - describe "using #{notifier}" do + describe "with #{inspect(notifier)}" do test "broadcasting notifications to subscribers" do - name = start_supervised_oban!(notifier: @notifier) + Sandbox.unboxed_run(Repo, fn -> + name = start_supervised_oban!(notifier: @notifier) - :ok = Notifier.listen(name, [:signal]) - :ok = Notifier.notify(name, :signal, %{incoming: "message"}) + :ok = Notifier.listen(name, [:signal]) + :ok = Notifier.notify(name, :signal, %{incoming: "message"}) - assert_receive {:notification, :signal, %{"incoming" => "message"}} + assert_receive {:notification, :signal, %{"incoming" => "message"}} + end) end test "notifying with complex types" do - name = start_supervised_oban!(notifier: @notifier) - - Notifier.listen(name, [:insert, :gossip, :signal]) - - Notifier.notify(name, :signal, %{ - date: ~D[2021-08-09], - keyword: [a: 1, b: 1], - map: %{tuple: {1, :second}}, - tuple: {1, :second} - }) - - assert_receive {:notification, :signal, notice} - assert %{"date" => "2021-08-09", "keyword" => [["a", 1], ["b", 1]]} = notice - assert %{"map" => %{"tuple" => [1, "second"]}, "tuple" => [1, "second"]} = notice - - stop_supervised(name) + Sandbox.unboxed_run(Repo, fn -> + name = start_supervised_oban!(notifier: @notifier) + + Notifier.listen(name, [:insert, :gossip, :signal]) + + Notifier.notify(name, :signal, %{ + date: ~D[2021-08-09], + keyword: [a: 1, b: 1], + map: %{tuple: {1, :second}}, + tuple: {1, :second} + }) + + assert_receive {:notification, :signal, notice} + assert %{"date" => "2021-08-09", "keyword" => [["a", 1], ["b", 1]]} = notice + assert %{"map" => %{"tuple" => [1, "second"]}, "tuple" => [1, "second"]} = notice + end) end test "broadcasting on select channels" do - name = start_supervised_oban!(notifier: @notifier) + Sandbox.unboxed_run(Repo, fn -> + name = start_supervised_oban!(notifier: @notifier) - :ok = Notifier.listen(name, [:signal, :gossip]) - :ok = Notifier.unlisten(name, [:gossip]) + :ok = Notifier.listen(name, [:signal, :gossip]) + :ok = Notifier.unlisten(name, [:gossip]) - :ok = Notifier.notify(name, :gossip, %{foo: "bar"}) - :ok = Notifier.notify(name, :signal, %{baz: "bat"}) + :ok = Notifier.notify(name, :gossip, %{foo: "bar"}) + :ok = Notifier.notify(name, :signal, %{baz: "bat"}) - refute_receive {:notification, :gossip, _} - assert_receive {:notification, :signal, _} + assert_receive {:notification, :signal, _} + refute_received {:notification, :gossip, _} + end) end test "ignoring messages scoped to other instances" do - name = start_supervised_oban!(notifier: @notifier) - - :ok = Notifier.listen(name, [:gossip, :signal]) + Sandbox.unboxed_run(Repo, fn -> + name = start_supervised_oban!(notifier: @notifier) - ident = - name - |> Oban.config() - |> Config.to_ident() + :ok = Notifier.listen(name, [:gossip, :signal]) - :ok = Notifier.notify(name, :gossip, %{foo: "bar", ident: ident}) - :ok = Notifier.notify(name, :signal, %{foo: "baz", ident: "bogus.ident"}) + ident = + name + |> Oban.config() + |> Config.to_ident() - assert_receive {:notification, :gossip, _} - refute_receive {:notification, :signal, _} + :ok = Notifier.notify(name, :gossip, %{foo: "bar", ident: ident}) + :ok = Notifier.notify(name, :signal, %{foo: "baz", ident: "bogus.ident"}) - stop_supervised(name) + assert_receive {:notification, :gossip, _} + refute_received {:notification, :signal, _} + end) end end end diff --git a/test/oban/peer_test.exs b/test/oban/peer_test.exs index f7c1f469..8924a614 100644 --- a/test/oban/peer_test.exs +++ b/test/oban/peer_test.exs @@ -24,7 +24,7 @@ defmodule Oban.PeerTest do describe "using #{peer}" do test "a single node acquires leadership" do - name = start_supervised_oban!(peer: @peer) + name = start_supervised_oban!(peer: @peer, poll_interval: 250) assert Peer.leader?(name) end @@ -50,7 +50,7 @@ defmodule Oban.PeerTest do @tag :capture_log test "leadership checks return false after a timeout" do - name = start_supervised_oban!(peer: @peer) + name = start_supervised_oban!(peer: @peer, poll_interval: 250) assert Peer.leader?(name) @@ -58,7 +58,7 @@ defmodule Oban.PeerTest do |> Registry.whereis(Peer) |> :sys.suspend() - refute Peer.leader?(name, 100) + refute Peer.leader?(name, 10) end end end diff --git a/test/oban/peers/postgres_test.exs b/test/oban/peers/postgres_test.exs index d0a5d5ae..06feb893 100644 --- a/test/oban/peers/postgres_test.exs +++ b/test/oban/peers/postgres_test.exs @@ -21,9 +21,12 @@ defmodule Oban.Peers.PostgresTest do logged = capture_log(fn -> - name = start_supervised_oban!(peer: Postgres, plugins: []) + name = start_supervised_oban!(peer: Postgres, plugins: false) + conf = Oban.config(name) - refute Peer.leader?(name) + start_supervised!({Peer, conf: conf, name: Peer}) + + refute Postgres.leader?(Peer) end) assert logged =~ "leadership is disabled" diff --git a/test/support/case.ex b/test/support/case.ex index f513fc4f..e2c265b2 100644 --- a/test/support/case.ex +++ b/test/support/case.ex @@ -37,15 +37,9 @@ defmodule Oban.Case do END$$ """ - setup tags do - # Within Sandbox mode everything happens in a transaction, which prevents the use of - # LISTEN/NOTIFY messages. - if tags[:integration] do - Repo.query!(@delete_query, []) - - on_exit(fn -> Repo.query!(@delete_query, []) end) - else - pid = Sandbox.start_owner!(Repo, shared: not tags[:async]) + setup context do + unless context[:migration] do + pid = Sandbox.start_owner!(Repo, shared: not context[:async]) on_exit(fn -> Sandbox.stop_owner(pid) end) end @@ -53,6 +47,10 @@ defmodule Oban.Case do :ok end + def delete_oban_data! do + Repo.query!(@delete_query, []) + end + def start_supervised_oban!(opts) do opts = opts diff --git a/test/support/repo.ex b/test/support/repo.ex index 54c87dc8..0e0dac6b 100644 --- a/test/support/repo.ex +++ b/test/support/repo.ex @@ -17,3 +17,17 @@ defmodule Oban.Test.DynamicRepo do {:ok, Oban.Test.Repo.config()} end end + +defmodule Oban.Test.MigrationRepo do + @moduledoc false + + use Ecto.Repo, + otp_app: :oban, + adapter: Ecto.Adapters.Postgres + + def init(_, _) do + config = Oban.Test.Repo.config() + + {:ok, Keyword.delete(config, :pool)} + end +end