From 3fbc80eb585b44e2f315b9bf02b655b50ee473d3 Mon Sep 17 00:00:00 2001 From: Lain Soykaf Date: Sat, 16 Dec 2023 20:26:08 +0400 Subject: [PATCH 1/5] B ActivityPub.Publisher: Prioritize direct mentions --- lib/pleroma/web/activity_pub/publisher.ex | 98 +++++++++++-------- lib/pleroma/web/federator/publisher.ex | 7 +- .../web/activity_pub/publisher_test.exs | 67 ++++++++++--- test/support/factory.ex | 2 +- 4 files changed, 114 insertions(+), 60 deletions(-) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index af6aa0781..373ceef32 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -118,7 +118,7 @@ defp should_federate?(inbox, public) do end end - @spec recipients(User.t(), Activity.t()) :: list(User.t()) | [] + @spec recipients(User.t(), Activity.t()) :: {[User.t()], [User.t()]} defp recipients(actor, activity) do followers = if actor.follower_address in activity.recipients do @@ -138,7 +138,10 @@ defp recipients(actor, activity) do [] end - Pleroma.Web.Federator.Publisher.remote_users(actor, activity) ++ followers ++ fetchers + mentioned = Pleroma.Web.Federator.Publisher.remote_users(actor, activity) + non_mentioned = (followers ++ fetchers) -- mentioned + + {mentioned, non_mentioned} end defp get_cc_ap_ids(ap_id, recipients) do @@ -195,34 +198,39 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - recipients = recipients(actor, activity) + {priority_recipients, recipients} = recipients(actor, activity) inboxes = - recipients - |> Enum.map(fn actor -> actor.inbox end) - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() + [priority_recipients, recipients] + |> Enum.map(fn recipients -> + recipients + |> Enum.map(fn actor -> actor.inbox end) + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + end) Repo.checkout(fn -> - Enum.each(inboxes, fn {inbox, unreachable_since} -> - %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end) + Enum.each(inboxes, fn inboxes -> + Enum.each(inboxes, fn {inbox, unreachable_since} -> + %User{ap_id: ap_id} = Enum.find(recipients, fn actor -> actor.inbox == inbox end) - # Get all the recipients on the same host and add them to cc. Otherwise, a remote - # instance would only accept a first message for the first recipient and ignore the rest. - cc = get_cc_ap_ids(ap_id, recipients) + # Get all the recipients on the same host and add them to cc. Otherwise, a remote + # instance would only accept a first message for the first recipient and ignore the rest. + cc = get_cc_ap_ids(ap_id, recipients) - json = - data - |> Map.put("cc", cc) - |> Jason.encode!() + json = + data + |> Map.put("cc", cc) + |> Jason.encode!() - Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ - inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], - unreachable_since: unreachable_since - }) + Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ + inbox: inbox, + json: json, + actor_id: actor.id, + id: activity.data["id"], + unreachable_since: unreachable_since + }) + end) end) end) end @@ -239,25 +247,33 @@ def publish(%User{} = actor, %Activity{} = activity) do {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - recipients(actor, activity) - |> Enum.map(fn %User{} = user -> - determine_inbox(activity, user) - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - |> Instances.filter_reachable() - |> Enum.each(fn {inbox, unreachable_since} -> - Pleroma.Web.Federator.Publisher.enqueue_one( - __MODULE__, - %{ - inbox: inbox, - json: json, - actor_id: actor.id, - id: activity.data["id"], - unreachable_since: unreachable_since - } - ) + {priority_recipients, recipients} = recipients(actor, activity) + + [{priority_recipients, 0}, {recipients, 1}] + |> Enum.map(fn {recipients, priority} -> + recipients + |> Enum.map(fn %User{} = user -> + determine_inbox(activity, user) + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Instances.filter_reachable() + |> Enum.each(fn {inbox, unreachable_since} -> + Pleroma.Web.Federator.Publisher.enqueue_one( + __MODULE__, + %{ + inbox: inbox, + json: json, + actor_id: actor.id, + id: activity.data["id"], + unreachable_since: unreachable_since + }, + priority: priority + ) + end) end) + + :ok end def gather_webfinger_links(%User{} = user) do diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index a45796e9d..8c6547208 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -29,11 +29,12 @@ defmodule Pleroma.Web.Federator.Publisher do @doc """ Enqueue publishing a single activity. """ - @spec enqueue_one(module(), Map.t()) :: :ok - def enqueue_one(module, %{} = params) do + @spec enqueue_one(module(), Map.t(), Keyword.t()) :: {:ok, %Oban.Job{}} + def enqueue_one(module, %{} = params, worker_args \\ []) do PublisherWorker.enqueue( "publish_one", - %{"module" => to_string(module), "params" => params} + %{"module" => to_string(module), "params" => params}, + worker_args ) end diff --git a/test/pleroma/web/activity_pub/publisher_test.exs b/test/pleroma/web/activity_pub/publisher_test.exs index c5137cbb7..9800144b5 100644 --- a/test/pleroma/web/activity_pub/publisher_test.exs +++ b/test/pleroma/web/activity_pub/publisher_test.exs @@ -331,11 +331,40 @@ test "publish to url with with different ports" do assert res == :ok assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: note_activity.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain.com/users/nick1/inbox", + actor_id: actor.id, + id: note_activity.data["id"] + }, + priority: 1 + ) + ) + end + + test_with_mock "Publishes to directly addressed actors with higher priority.", + Pleroma.Web.Federator.Publisher, + [:passthrough], + [] do + note_activity = insert(:direct_note_activity) + + actor = Pleroma.User.get_by_ap_id(note_activity.data["actor"]) + + res = Publisher.publish(actor, note_activity) + + assert res == :ok + + assert called( + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: :_, + actor_id: actor.id, + id: note_activity.data["id"] + }, + priority: 0 + ) ) end @@ -414,19 +443,27 @@ test "publish to url with with different ports" do assert res == :ok assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain.com/users/nick1/inbox", + actor_id: actor.id, + id: delete.data["id"] + }, + priority: 1 + ) ) assert called( - Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ - inbox: "https://domain2.com/users/nick1/inbox", - actor_id: actor.id, - id: delete.data["id"] - }) + Pleroma.Web.Federator.Publisher.enqueue_one( + Publisher, + %{ + inbox: "https://domain2.com/users/nick1/inbox", + actor_id: actor.id, + id: delete.data["id"] + }, + priority: 1 + ) ) end end diff --git a/test/support/factory.ex b/test/support/factory.ex index d94544717..20bc5162e 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -212,7 +212,7 @@ def listen_factory do end def direct_note_factory do - user2 = insert(:user) + user2 = insert(:user, local: false, inbox: "http://example.com/inbox") %Pleroma.Object{data: data} = note_factory() %Pleroma.Object{data: Map.merge(data, %{"to" => [user2.ap_id]})} From c212fc1dcfae578e83a0a8fb036da30ca0b92f56 Mon Sep 17 00:00:00 2001 From: Lain Soykaf Date: Sat, 16 Dec 2023 20:32:13 +0400 Subject: [PATCH 2/5] User: Ignore non-local users when setting 'last active at' --- lib/pleroma/user.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 0706f5607..ddeaf0624 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -2681,6 +2681,8 @@ def update_last_active_at(%__MODULE__{local: true} = user) do |> update_and_set_cache() end + def update_last_active_at(user), do: user + def active_user_count(days \\ 30) do active_after = Timex.shift(NaiveDateTime.utc_now(), days: -days) From a0f70cf7d0d88679a2d64ce3adcf769d320c9f57 Mon Sep 17 00:00:00 2001 From: Lain Soykaf Date: Sat, 16 Dec 2023 20:40:51 +0400 Subject: [PATCH 3/5] Add changelog --- changelog.d/prioritize-direct-recipients.add | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 changelog.d/prioritize-direct-recipients.add diff --git a/changelog.d/prioritize-direct-recipients.add b/changelog.d/prioritize-direct-recipients.add new file mode 100644 index 000000000..e69de29bb From 77bb1bb6c8297c7433238fa204b09aa28715f8c8 Mon Sep 17 00:00:00 2001 From: Lain Soykaf Date: Sat, 16 Dec 2023 21:41:28 +0400 Subject: [PATCH 4/5] Actually write changelog --- changelog.d/prioritize-direct-recipients.add | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.d/prioritize-direct-recipients.add b/changelog.d/prioritize-direct-recipients.add index e69de29bb..4efc94c68 100644 --- a/changelog.d/prioritize-direct-recipients.add +++ b/changelog.d/prioritize-direct-recipients.add @@ -0,0 +1 @@ +- Prioritize mentioned recipients (i.e., those that are not just followers) when federating. From c1423ddca3c82140e537037fdf133ee057c2dac0 Mon Sep 17 00:00:00 2001 From: Lain Soykaf Date: Sun, 17 Dec 2023 11:15:58 +0400 Subject: [PATCH 5/5] ActivityPub.Publisher: Filter inboxes --- lib/pleroma/web/activity_pub/publisher.ex | 27 +++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex index 373ceef32..a580994b1 100644 --- a/lib/pleroma/web/activity_pub/publisher.ex +++ b/lib/pleroma/web/activity_pub/publisher.ex @@ -118,7 +118,7 @@ defp should_federate?(inbox, public) do end end - @spec recipients(User.t(), Activity.t()) :: {[User.t()], [User.t()]} + @spec recipients(User.t(), Activity.t()) :: [[User.t()]] defp recipients(actor, activity) do followers = if actor.follower_address in activity.recipients do @@ -141,7 +141,7 @@ defp recipients(actor, activity) do mentioned = Pleroma.Web.Federator.Publisher.remote_users(actor, activity) non_mentioned = (followers ++ fetchers) -- mentioned - {mentioned, non_mentioned} + [mentioned, non_mentioned] end defp get_cc_ap_ids(ap_id, recipients) do @@ -198,7 +198,7 @@ def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity) public = is_public?(activity) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) - {priority_recipients, recipients} = recipients(actor, activity) + [priority_recipients, recipients] = recipients(actor, activity) inboxes = [priority_recipients, recipients] @@ -247,16 +247,19 @@ def publish(%User{} = actor, %Activity{} = activity) do {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - {priority_recipients, recipients} = recipients(actor, activity) - - [{priority_recipients, 0}, {recipients, 1}] - |> Enum.map(fn {recipients, priority} -> - recipients - |> Enum.map(fn %User{} = user -> - determine_inbox(activity, user) + [priority_inboxes, inboxes] = + recipients(actor, activity) + |> Enum.map(fn recipients -> + recipients + |> Enum.map(fn actor -> actor.inbox end) + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + + inboxes = inboxes -- priority_inboxes + + [{priority_inboxes, 0}, {inboxes, 1}] + |> Enum.each(fn {inboxes, priority} -> + inboxes |> Instances.filter_reachable() |> Enum.each(fn {inbox, unreachable_since} -> Pleroma.Web.Federator.Publisher.enqueue_one(