diff --git a/changelog.d/oban-queues.change b/changelog.d/oban-queues.change new file mode 100644 index 000000000..16df6409a --- /dev/null +++ b/changelog.d/oban-queues.change @@ -0,0 +1 @@ +Oban queues have refactored to simplify the queue design diff --git a/config/config.exs b/config/config.exs index 956d067f3..039d68379 100644 --- a/config/config.exs +++ b/config/config.exs @@ -582,24 +582,14 @@ log: false, queues: [ activity_expiration: 10, - token_expiration: 5, - filter_expiration: 1, - backup: 1, federator_incoming: 5, federator_outgoing: 5, ingestion_queue: 50, web_push: 50, - mailer: 10, transmogrifier: 20, - scheduled_activities: 10, - poll_notifications: 10, background: 5, - remote_fetcher: 2, - attachments_cleanup: 1, - new_users_digest: 1, - mute_expire: 5, search_indexing: [limit: 10, paused: true], - rich_media_expiration: 2 + slow: 1 ], plugins: [Oban.Plugins.Pruner], crontab: [ diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex index 63c6cb45b..c361d7d89 100644 --- a/lib/pleroma/scheduled_activity.ex +++ b/lib/pleroma/scheduled_activity.ex @@ -204,7 +204,7 @@ def due_activities(offset \\ 0) do def job_query(scheduled_activity_id) do from(j in Oban.Job, - where: j.queue == "scheduled_activities", + where: j.queue == "federator_outgoing", where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id)) ) end diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 1f2c3835a..4b30fd21d 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -44,7 +44,7 @@ def incoming_ap_doc(%{params: params, req_headers: req_headers}) do end def incoming_ap_doc(%{"type" => "Delete"} = params) do - ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3) + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3, queue: :slow) end def incoming_ap_doc(params) do diff --git a/lib/pleroma/workers/attachments_cleanup_worker.ex b/lib/pleroma/workers/attachments_cleanup_worker.ex index 4c1764053..0b570b70b 100644 --- a/lib/pleroma/workers/attachments_cleanup_worker.ex +++ b/lib/pleroma/workers/attachments_cleanup_worker.ex @@ -8,7 +8,7 @@ defmodule Pleroma.Workers.AttachmentsCleanupWorker do alias Pleroma.Object alias Pleroma.Repo - use Pleroma.Workers.WorkerHelper, queue: "attachments_cleanup" + use Pleroma.Workers.WorkerHelper, queue: "slow" @impl Oban.Worker def perform(%Job{ diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index a485ddb4b..54ac31a3c 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.BackupWorker do - use Oban.Worker, queue: :backup, max_attempts: 1 + use Oban.Worker, queue: :slow, max_attempts: 1 alias Oban.Job alias Pleroma.User.Backup diff --git a/lib/pleroma/workers/cron/new_users_digest_worker.ex b/lib/pleroma/workers/cron/new_users_digest_worker.ex index 1c3e445aa..d2abb2d3b 100644 --- a/lib/pleroma/workers/cron/new_users_digest_worker.ex +++ b/lib/pleroma/workers/cron/new_users_digest_worker.ex @@ -9,7 +9,7 @@ defmodule Pleroma.Workers.Cron.NewUsersDigestWorker do import Ecto.Query - use Pleroma.Workers.WorkerHelper, queue: "mailer" + use Pleroma.Workers.WorkerHelper, queue: "background" @impl Oban.Worker def perform(_job) do diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex index 940716558..652bf77e0 100644 --- a/lib/pleroma/workers/mailer_worker.ex +++ b/lib/pleroma/workers/mailer_worker.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.MailerWorker do - use Pleroma.Workers.WorkerHelper, queue: "mailer" + use Pleroma.Workers.WorkerHelper, queue: "background" @impl Oban.Worker def perform(%Job{args: %{"op" => "email", "encoded_email" => encoded_email, "config" => config}}) do diff --git a/lib/pleroma/workers/mute_expire_worker.ex b/lib/pleroma/workers/mute_expire_worker.ex index 8ce458d48..8ad287a7f 100644 --- a/lib/pleroma/workers/mute_expire_worker.ex +++ b/lib/pleroma/workers/mute_expire_worker.ex @@ -3,7 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.MuteExpireWorker do - use Pleroma.Workers.WorkerHelper, queue: "mute_expire" + use Pleroma.Workers.WorkerHelper, queue: "background" @impl Oban.Worker def perform(%Job{args: %{"op" => "unmute_user", "muter_id" => muter_id, "mutee_id" => mutee_id}}) do diff --git a/lib/pleroma/workers/poll_worker.ex b/lib/pleroma/workers/poll_worker.ex index 022d026f8..70df54193 100644 --- a/lib/pleroma/workers/poll_worker.ex +++ b/lib/pleroma/workers/poll_worker.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Workers.PollWorker do @moduledoc """ Generates notifications when a poll ends. """ - use Pleroma.Workers.WorkerHelper, queue: "poll_notifications" + use Pleroma.Workers.WorkerHelper, queue: "background" alias Pleroma.Activity alias Pleroma.Notification diff --git a/lib/pleroma/workers/purge_expired_activity.ex b/lib/pleroma/workers/purge_expired_activity.ex index e554684fe..a65593b6e 100644 --- a/lib/pleroma/workers/purge_expired_activity.ex +++ b/lib/pleroma/workers/purge_expired_activity.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Workers.PurgeExpiredActivity do Worker which purges expired activity. """ - use Oban.Worker, queue: :activity_expiration, max_attempts: 1, unique: [period: :infinity] + use Oban.Worker, queue: :slow, max_attempts: 1, unique: [period: :infinity] import Ecto.Query @@ -59,7 +59,7 @@ defp find_user(ap_id) do def get_expiration(id) do from(j in Oban.Job, where: j.state == "scheduled", - where: j.queue == "activity_expiration", + where: j.queue == "slow", where: fragment("?->>'activity_id' = ?", j.args, ^id) ) |> Pleroma.Repo.one() diff --git a/lib/pleroma/workers/purge_expired_filter.ex b/lib/pleroma/workers/purge_expired_filter.ex index 9114aeb7f..1f6931e4c 100644 --- a/lib/pleroma/workers/purge_expired_filter.ex +++ b/lib/pleroma/workers/purge_expired_filter.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Workers.PurgeExpiredFilter do Worker which purges expired filters """ - use Oban.Worker, queue: :filter_expiration, max_attempts: 1, unique: [period: :infinity] + use Oban.Worker, queue: :background, max_attempts: 1, unique: [period: :infinity] import Ecto.Query @@ -38,7 +38,7 @@ def timeout(_job), do: :timer.seconds(5) def get_expiration(id) do from(j in Job, where: j.state == "scheduled", - where: j.queue == "filter_expiration", + where: j.queue == "background", where: fragment("?->'filter_id' = ?", j.args, ^id) ) |> Repo.one() diff --git a/lib/pleroma/workers/purge_expired_token.ex b/lib/pleroma/workers/purge_expired_token.ex index 2ccd9e80b..1854bf561 100644 --- a/lib/pleroma/workers/purge_expired_token.ex +++ b/lib/pleroma/workers/purge_expired_token.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Workers.PurgeExpiredToken do Worker which purges expired OAuth tokens """ - use Oban.Worker, queue: :token_expiration, max_attempts: 1 + use Oban.Worker, queue: :background, max_attempts: 1 @spec enqueue(%{token_id: integer(), valid_until: DateTime.t(), mod: module()}) :: {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()} diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index c26418483..ed04c54b2 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -5,7 +5,7 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do alias Pleroma.Object.Fetcher - use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher" + use Pleroma.Workers.WorkerHelper, queue: "background" @impl Oban.Worker def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do diff --git a/lib/pleroma/workers/rich_media_expiration_worker.ex b/lib/pleroma/workers/rich_media_expiration_worker.ex index d7ae497a7..0b74687cf 100644 --- a/lib/pleroma/workers/rich_media_expiration_worker.ex +++ b/lib/pleroma/workers/rich_media_expiration_worker.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Workers.RichMediaExpirationWorker do alias Pleroma.Web.RichMedia.Card use Oban.Worker, - queue: :rich_media_expiration + queue: :background @impl Oban.Worker def perform(%Job{args: %{"url" => url} = _args}) do diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index 4df84d00f..ab62686f4 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do The worker to post scheduled activity. """ - use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" alias Pleroma.Repo alias Pleroma.ScheduledActivity diff --git a/priv/repo/migrations/20240527144418_oban_queues_refactor.exs b/priv/repo/migrations/20240527144418_oban_queues_refactor.exs new file mode 100644 index 000000000..64ee28dfd --- /dev/null +++ b/priv/repo/migrations/20240527144418_oban_queues_refactor.exs @@ -0,0 +1,32 @@ +defmodule Pleroma.Repo.Migrations.ObanQueuesRefactor do + use Ecto.Migration + + @changed_queues [ + {"attachments_cleanup", "slow"}, + {"mailer", "background"}, + {"mute_expire", "background"}, + {"poll_notifications", "background"}, + {"activity_expiration", "slow"}, + {"filter_expiration", "background"}, + {"token_expiration", "background"}, + {"remote_fetcher", "background"}, + {"rich_media_expiration", "background"} + ] + + def up do + Enum.each(@changed_queues, fn {old, new} -> + execute("UPDATE oban_jobs SET queue = '#{new}' WHERE queue = '#{old}';") + end) + + # Handled special as reverting this would not be ideal and leaving it is harmless + execute( + "UPDATE oban_jobs SET queue = 'federator_outgoing' WHERE queue = 'scheduled_activities';" + ) + end + + def down do + # Just move all slow queue jobs to background queue if we are reverting + # as the slow queue will not be processing jobs + execute("UPDATE oban_jobs SET queue = 'background' WHERE queue = 'slow';") + end +end diff --git a/test/pleroma/scheduled_activity_test.exs b/test/pleroma/scheduled_activity_test.exs index 4818e8bcf..aaf643cfc 100644 --- a/test/pleroma/scheduled_activity_test.exs +++ b/test/pleroma/scheduled_activity_test.exs @@ -31,8 +31,7 @@ test "scheduled activities with jobs when ScheduledActivity enabled" do {:ok, sa1} = ScheduledActivity.create(user, attrs) {:ok, sa2} = ScheduledActivity.create(user, attrs) - jobs = - Repo.all(from(j in Oban.Job, where: j.queue == "scheduled_activities", select: j.args)) + jobs = Repo.all(from(j in Oban.Job, where: j.queue == "federator_outgoing", select: j.args)) assert jobs == [%{"activity_id" => sa1.id}, %{"activity_id" => sa2.id}] end diff --git a/test/pleroma/web/mastodon_api/controllers/scheduled_activity_controller_test.exs b/test/pleroma/web/mastodon_api/controllers/scheduled_activity_controller_test.exs index 632242221..2d6b2aee2 100644 --- a/test/pleroma/web/mastodon_api/controllers/scheduled_activity_controller_test.exs +++ b/test/pleroma/web/mastodon_api/controllers/scheduled_activity_controller_test.exs @@ -3,6 +3,7 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do + use Oban.Testing, repo: Pleroma.Repo use Pleroma.Web.ConnCase, async: true alias Pleroma.Repo @@ -78,7 +79,7 @@ test "updates a scheduled activity" do } ) - job = Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities")) + job = Repo.one(from(j in Oban.Job, where: j.queue == "federator_outgoing")) assert job.args == %{"activity_id" => scheduled_activity.id} assert DateTime.truncate(job.scheduled_at, :second) == to_datetime(scheduled_at) @@ -124,9 +125,11 @@ test "deletes a scheduled activity" do } ) - job = Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities")) - - assert job.args == %{"activity_id" => scheduled_activity.id} + assert_enqueued( + worker: Pleroma.Workers.ScheduledActivityWorker, + args: %{"activity_id" => scheduled_activity.id}, + queue: :federator_outgoing + ) res_conn = conn @@ -135,7 +138,11 @@ test "deletes a scheduled activity" do assert %{} = json_response_and_validate_schema(res_conn, 200) refute Repo.get(ScheduledActivity, scheduled_activity.id) - refute Repo.get(Oban.Job, job.id) + + refute_enqueued( + worker: Pleroma.Workers.ScheduledActivityWorker, + args: %{"activity_id" => scheduled_activity.id} + ) res_conn = conn