diff --git a/config/config.exs b/config/config.exs
index e7e17669e..259529f97 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -496,7 +496,6 @@
crontab: [
{"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker},
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker},
- {"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker},
{"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker}
]
diff --git a/lib/pleroma/scheduled_activity.ex b/lib/pleroma/scheduled_activity.ex
index fea2cf3ff..96fa6a987 100644
--- a/lib/pleroma/scheduled_activity.ex
+++ b/lib/pleroma/scheduled_activity.ex
@@ -5,11 +5,13 @@
defmodule Pleroma.ScheduledActivity do
use Ecto.Schema
+ alias Ecto.Multi
alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI.Utils
+ alias Pleroma.Workers.ScheduledActivityWorker
import Ecto.Query
import Ecto.Changeset
@@ -105,14 +107,29 @@ def far_enough?(scheduled_at) do
end
def new(%User{} = user, attrs) do
- %ScheduledActivity{user_id: user.id}
- |> changeset(attrs)
+ changeset(%ScheduledActivity{user_id: user.id}, attrs)
end
+ @doc """
+ Creates ScheduledActivity and add to queue to perform at scheduled_at date
+ """
+ @spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
def create(%User{} = user, attrs) do
- user
- |> new(attrs)
- |> Repo.insert()
+ Multi.new()
+ |> Multi.insert(:scheduled_activity, new(user, attrs))
+ |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
+ %{activity_id: activity.id}
+ |> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
+ |> Oban.insert()
+ end)
+ |> Repo.transaction()
+ |> case do
+ {:ok, %{scheduled_activity: scheduled_activity}} ->
+ {:ok, scheduled_activity}
+
+ {:error, _, changeset, _} ->
+ {:error, changeset}
+ end
end
def get(%User{} = user, scheduled_activity_id) do
@@ -122,15 +139,35 @@ def get(%User{} = user, scheduled_activity_id) do
|> Repo.one()
end
- def update(%ScheduledActivity{} = scheduled_activity, attrs) do
- scheduled_activity
- |> update_changeset(attrs)
- |> Repo.update()
+ @spec update(ScheduledActivity.t(), map()) ::
+ {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
+ def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
+ with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
+ {:error, update_changeset(scheduled_activity, attrs)} do
+ Multi.new()
+ |> Multi.update(:scheduled_activity, changeset)
+ |> Multi.update_all(:scheduled_job, job_query(id),
+ set: [scheduled_at: changeset.changes[:scheduled_at]]
+ )
+ |> Repo.transaction()
+ |> case do
+ {:ok, %{scheduled_activity: scheduled_activity}} ->
+ {:ok, scheduled_activity}
+
+ {:error, _, changeset, _} ->
+ {:error, changeset}
+ end
+ end
+ end
+
+ def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
+ id
+ |> job_query
+ |> Repo.delete_all()
end
def delete(%ScheduledActivity{} = scheduled_activity) do
- scheduled_activity
- |> Repo.delete()
+ Repo.delete(scheduled_activity)
end
def delete(id) when is_binary(id) or is_integer(id) do
@@ -158,4 +195,11 @@ def due_activities(offset \\ 0) do
|> where([sa], sa.scheduled_at < ^naive_datetime)
|> Repo.all()
end
+
+ def job_query(scheduled_activity_id) do
+ from(j in Oban.Job,
+ where: j.queue == "scheduled_activities",
+ where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
+ )
+ end
end
diff --git a/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex b/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex
index ff9276541..4f9a8bdbe 100644
--- a/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/scheduled_activity_controller.ex
@@ -45,7 +45,8 @@ def update(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, params)
@doc "DELETE /api/v1/scheduled_statuses/:id"
def delete(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, _params) do
- with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity) do
+ with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity),
+ _ <- ScheduledActivity.delete_job(scheduled_activity) do
render(conn, "show.json", scheduled_activity: scheduled_activity)
end
end
diff --git a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
index 74b223cf4..d70749dfa 100644
--- a/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/status_controller.ex
@@ -124,15 +124,18 @@ def create(
) do
params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"])
- if ScheduledActivity.far_enough?(scheduled_at) do
- with {:ok, scheduled_activity} <-
- ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
- conn
- |> put_view(ScheduledActivityView)
- |> render("show.json", scheduled_activity: scheduled_activity)
- end
+ with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)},
+ attrs <- %{"params" => params, "scheduled_at" => scheduled_at},
+ {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do
+ conn
+ |> put_view(ScheduledActivityView)
+ |> render("show.json", scheduled_activity: scheduled_activity)
else
- create(conn, Map.drop(params, ["scheduled_at"]))
+ {:far_enough, _} ->
+ create(conn, Map.drop(params, ["scheduled_at"]))
+
+ error ->
+ error
end
end
diff --git a/lib/pleroma/workers/cron/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
similarity index 59%
rename from lib/pleroma/workers/cron/scheduled_activity_worker.ex
rename to lib/pleroma/workers/scheduled_activity_worker.ex
index 407ab687a..5109d7f75 100644
--- a/lib/pleroma/workers/cron/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -2,12 +2,13 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do
+defmodule Pleroma.Workers.ScheduledActivityWorker do
@moduledoc """
- The worker to post scheduled actvities.
+ The worker to post scheduled activity.
"""
- use Oban.Worker, queue: "scheduled_activities"
+ use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
+
alias Pleroma.Config
alias Pleroma.ScheduledActivity
alias Pleroma.User
@@ -15,18 +16,20 @@ defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do
require Logger
- @schedule_interval :timer.minutes(1)
-
@impl Oban.Worker
- def perform(_opts, _job) do
+ def perform(%{"activity_id" => activity_id}, _job) do
if Config.get([ScheduledActivity, :enabled]) do
- @schedule_interval
- |> ScheduledActivity.due_activities()
- |> Enum.each(&post_activity/1)
+ case Pleroma.Repo.get(ScheduledActivity, activity_id) do
+ %ScheduledActivity{} = scheduled_activity ->
+ post_activity(scheduled_activity)
+
+ _ ->
+ Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
+ end
end
end
- def post_activity(scheduled_activity) do
+ defp post_activity(%ScheduledActivity{} = scheduled_activity) do
try do
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity)
%User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
diff --git a/test/scheduled_activity_test.exs b/test/scheduled_activity_test.exs
index d3d05745f..663cfdd34 100644
--- a/test/scheduled_activity_test.exs
+++ b/test/scheduled_activity_test.exs
@@ -26,6 +26,7 @@ test "when daily user limit is exceeded" do
attrs = %{params: %{}, scheduled_at: today}
{:ok, _} = ScheduledActivity.create(user, attrs)
{:ok, _} = ScheduledActivity.create(user, attrs)
+
{:error, changeset} = ScheduledActivity.create(user, attrs)
assert changeset.errors == [scheduled_at: {"daily limit exceeded", []}]
end
@@ -83,7 +84,10 @@ test "creates a status from the scheduled activity" do
params: %{status: "hi"}
)
- Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid)
+ Pleroma.Workers.ScheduledActivityWorker.perform(
+ %{"activity_id" => scheduled_activity.id},
+ :pid
+ )
refute Repo.get(ScheduledActivity, scheduled_activity.id)
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
diff --git a/test/support/helpers.ex b/test/support/helpers.ex
index ce39dd9d8..ec556a916 100644
--- a/test/support/helpers.ex
+++ b/test/support/helpers.ex
@@ -53,6 +53,12 @@ defmacro __using__(_opts) do
clear_config_all: 2
]
+ def to_datetime(naive_datetime) do
+ naive_datetime
+ |> DateTime.from_naive!("Etc/UTC")
+ |> DateTime.truncate(:second)
+ end
+
def collect_ids(collection) do
collection
|> Enum.map(& &1.id)
diff --git a/test/web/mastodon_api/controllers/scheduled_activity_controller_test.exs b/test/web/mastodon_api/controllers/scheduled_activity_controller_test.exs
index ae5fee2bc..5f3a376be 100644
--- a/test/web/mastodon_api/controllers/scheduled_activity_controller_test.exs
+++ b/test/web/mastodon_api/controllers/scheduled_activity_controller_test.exs
@@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
alias Pleroma.ScheduledActivity
import Pleroma.Factory
+ import Ecto.Query
test "shows scheduled activities", %{conn: conn} do
user = insert(:user)
@@ -68,7 +69,30 @@ test "shows a scheduled activity", %{conn: conn} do
test "updates a scheduled activity", %{conn: conn} do
user = insert(:user)
- scheduled_activity = insert(:scheduled_activity, user: user)
+
+ scheduled_at =
+ NaiveDateTime.add(
+ NaiveDateTime.utc_now(),
+ :timer.minutes(60),
+ :millisecond
+ )
+
+ {:ok, scheduled_activity} =
+ ScheduledActivity.create(
+ user,
+ %{
+ scheduled_at: scheduled_at,
+ params: build(:note).data
+ }
+ )
+
+ scheduled_activity_job =
+ Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
+
+ assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
+
+ assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
+ to_datetime(scheduled_at)
new_scheduled_at =
NaiveDateTime.add(NaiveDateTime.utc_now(), :timer.minutes(120), :millisecond)
@@ -82,6 +106,10 @@ test "updates a scheduled activity", %{conn: conn} do
assert %{"scheduled_at" => expected_scheduled_at} = json_response(res_conn, 200)
assert expected_scheduled_at == Pleroma.Web.CommonAPI.Utils.to_masto_date(new_scheduled_at)
+ scheduled_activity_job = refresh_record(scheduled_activity_job)
+
+ assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
+ to_datetime(new_scheduled_at)
res_conn =
conn
@@ -93,7 +121,25 @@ test "updates a scheduled activity", %{conn: conn} do
test "deletes a scheduled activity", %{conn: conn} do
user = insert(:user)
- scheduled_activity = insert(:scheduled_activity, user: user)
+
+ {:ok, scheduled_activity} =
+ ScheduledActivity.create(
+ user,
+ %{
+ scheduled_at:
+ NaiveDateTime.add(
+ NaiveDateTime.utc_now(),
+ :timer.minutes(60),
+ :millisecond
+ ),
+ params: build(:note).data
+ }
+ )
+
+ scheduled_activity_job =
+ Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
+
+ assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
res_conn =
conn
@@ -101,7 +147,8 @@ test "deletes a scheduled activity", %{conn: conn} do
|> delete("/api/v1/scheduled_statuses/#{scheduled_activity.id}")
assert %{} = json_response(res_conn, 200)
- assert nil == Repo.get(ScheduledActivity, scheduled_activity.id)
+ refute Repo.get(ScheduledActivity, scheduled_activity.id)
+ refute Repo.get(Oban.Job, scheduled_activity_job.id)
res_conn =
conn
diff --git a/test/workers/cron/scheduled_activity_worker_test.exs b/test/workers/cron/scheduled_activity_worker_test.exs
deleted file mode 100644
index 6f17d6f6c..000000000
--- a/test/workers/cron/scheduled_activity_worker_test.exs
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Cron.ScheduledActivityWorkerTest do
- use Pleroma.DataCase
- alias Pleroma.ScheduledActivity
- import Pleroma.Factory
-
- clear_config([ScheduledActivity, :enabled])
-
- test "creates a status from the scheduled activity" do
- Pleroma.Config.put([ScheduledActivity, :enabled], true)
- user = insert(:user)
-
- naive_datetime =
- NaiveDateTime.add(
- NaiveDateTime.utc_now(),
- -:timer.minutes(2),
- :millisecond
- )
-
- scheduled_activity =
- insert(
- :scheduled_activity,
- scheduled_at: naive_datetime,
- user: user,
- params: %{status: "hi"}
- )
-
- Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid)
-
- refute Repo.get(ScheduledActivity, scheduled_activity.id)
- activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
- assert Pleroma.Object.normalize(activity).data["content"] == "hi"
- end
-end