From 074b31d9ab30bbecba3fff4335dfec39af5cf9b8 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Thu, 7 Dec 2023 22:27:19 -0500 Subject: [PATCH 01/13] Optimistic Inbox Rework inbound federation to accept requests optimistically. The HTTP Signatures Plug will not attempt to fetch the actor or key and will fail early. If the signature cannot be validated we pass the required data into the Oban job with a reduced priority and increase the timeout to 20 seconds. The Oban job will handle the actor and key fetching before attempting to validate the activity again. This job will be retried 5 times by default. Another welcome side effect is that actors who change their keys can federate to Pleroma instances immediately instead of needing to wait the default value of 86400s / 24 hours before the key will be fetched again. --- lib/pleroma/signature.ex | 2 +- lib/pleroma/user.ex | 11 ++++++- .../activity_pub/activity_pub_controller.ex | 7 ++-- lib/pleroma/web/federator.ex | 7 ++++ lib/pleroma/workers/receiver_worker.ex | 33 +++++++++++++++++-- test/pleroma/user_test.exs | 9 +++-- 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 5cfdae051..42cceba28 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -58,7 +58,7 @@ def refetch_public_key(conn) do with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn), {:ok, actor_id} <- key_id_to_actor_id(kid), {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id), - {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do + {:ok, public_key} <- User.get_or_fetch_public_key_for_ap_id(actor_id) do {:ok, public_key} else e -> diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index ce125d608..4c9aeffcb 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -2135,7 +2135,7 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do def public_key(_), do: {:error, "key not found"} - def get_public_key_for_ap_id(ap_id) do + def get_or_fetch_public_key_for_ap_id(ap_id) do with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id), {:ok, public_key} <- public_key(user) do {:ok, public_key} @@ -2144,6 +2144,15 @@ def get_public_key_for_ap_id(ap_id) do end end + def get_public_key_for_ap_id(ap_id) do + with {:ok, %User{} = user} <- get_cached_by_ap_id(ap_id), + {:ok, public_key} <- public_key(user) do + {:ok, public_key} + else + _ -> :error + end + end + @doc "Gets or fetch a user by uri or nickname." @spec get_or_fetch(String.t()) :: {:ok, User.t()} | {:error, String.t()} def get_or_fetch("http://" <> _host = uri), do: get_or_fetch_by_ap_id(uri) diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 1357c379c..3b2193ca3 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -287,10 +287,9 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, params) do json(conn, "ok") end - def inbox(%{assigns: %{valid_signature: false}} = conn, _params) do - conn - |> put_status(:bad_request) - |> json("Invalid HTTP Signature") + def inbox(%{assigns: %{valid_signature: false}, req_headers: req_headers} = conn, params) do + Federator.incoming_ap_doc(%{req_headers: req_headers, params: params}) + json(conn, "ok") end # POST /relay/inbox -or- POST /internal/fetch/inbox diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 84b77cda1..1b8b22b7e 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -35,6 +35,13 @@ def allowed_thread_distance?(distance) do end # Client API + def incoming_ap_doc(%{params: params, req_headers: req_headers}) do + ReceiverWorker.enqueue( + "incoming_ap_doc", + %{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)}, + priority: 5 + ) + end def incoming_ap_doc(params) do ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index cf1bb62b4..b04e2974a 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -3,24 +3,51 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.ReceiverWorker do + alias Pleroma.Signature + alias Pleroma.User alias Pleroma.Web.Federator use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" @impl Oban.Worker + + def perform(%Job{ + args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params} + }) do + conn_data = %{params: params, req_headers: req_headers} + + with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]), + {:ok, _public_key} <- Signature.refetch_public_key(conn_data), + {:signature, true} <- {:signature, HTTPSignatures.validate_conn(conn_data)}, + {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do + {:ok, res} + else + e -> process_errors(e) + end + end + def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do {:ok, res} else + e -> process_errors(e) + end + end + + @impl Oban.Worker + def timeout(%_{args: %{"timeout" => timeout}}), do: timeout + + def timeout(_job), do: :timer.seconds(5) + + defp process_errors(errors) do + case errors do {:error, :origin_containment_failed} -> {:cancel, :origin_containment_failed} {:error, :already_present} -> {:cancel, :already_present} {:error, {:validate_object, reason}} -> {:cancel, reason} {:error, {:error, {:validate, reason}}} -> {:cancel, reason} {:error, {:reject, reason}} -> {:cancel, reason} + {:signature, false} -> {:error, :invalid_signature} e -> e end end - - @impl Oban.Worker - def timeout(_job), do: :timer.seconds(5) end diff --git a/test/pleroma/user_test.exs b/test/pleroma/user_test.exs index b9df527a0..c0b576c3c 100644 --- a/test/pleroma/user_test.exs +++ b/test/pleroma/user_test.exs @@ -1951,8 +1951,13 @@ test "unsuggests a user" do end end - test "get_public_key_for_ap_id fetches a user that's not in the db" do - assert {:ok, _key} = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin") + test "get_or_fetch_public_key_for_ap_id fetches a user that's not in the db" do + assert {:ok, _key} = + User.get_or_fetch_public_key_for_ap_id("http://mastodon.example.org/users/admin") + end + + test "get_public_key_for_ap_id returns correctly for user that's not in the db" do + assert :error = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin") end describe "per-user rich-text filtering" do From 0d3f1be2307c778c3f1e59139a45341a57433de2 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 17:46:10 -0500 Subject: [PATCH 02/13] Fix test; log message no longer emitted here --- test/pleroma/signature_test.exs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/pleroma/signature_test.exs b/test/pleroma/signature_test.exs index b849cbee7..f5a915fa8 100644 --- a/test/pleroma/signature_test.exs +++ b/test/pleroma/signature_test.exs @@ -43,10 +43,7 @@ test "it returns key" do end test "it returns error when not found user" do - assert capture_log(fn -> - assert Signature.fetch_public_key(make_fake_conn("https://test-ap-id")) == - {:error, :error} - end) =~ "[error] Could not decode user" + assert Signature.fetch_public_key(make_fake_conn("https://test-ap-id")) == {:error, :error} end test "it returns error if public key is nil" do From ce5acd4158a351c59a79a4a0692eb2ef9775b554 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 18:10:22 -0500 Subject: [PATCH 03/13] get_cached_by_ap_id/1 returns a single result, not a tuple --- lib/pleroma/user.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 4c9aeffcb..3f729fdcc 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -2145,7 +2145,7 @@ def get_or_fetch_public_key_for_ap_id(ap_id) do end def get_public_key_for_ap_id(ap_id) do - with {:ok, %User{} = user} <- get_cached_by_ap_id(ap_id), + with %User{} = user <- get_cached_by_ap_id(ap_id), {:ok, public_key} <- public_key(user) do {:ok, public_key} else From 1b5964979feb7bc105c5b96d3be6d50a5968740a Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 18:13:43 -0500 Subject: [PATCH 04/13] Optimistic Inbox --- changelog.d/optimistic-inbox.change | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/optimistic-inbox.change diff --git a/changelog.d/optimistic-inbox.change b/changelog.d/optimistic-inbox.change new file mode 100644 index 000000000..2cf1ce92c --- /dev/null +++ b/changelog.d/optimistic-inbox.change @@ -0,0 +1 @@ +Optimistic Inbox reduces the processing overhead of incoming activities without instantly verifiable signatures. From 97cf78f63d312d0475ac8908d0b093cb5eff18d5 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 18:24:30 -0500 Subject: [PATCH 05/13] Remove unnecessary forced refresh of user --- lib/pleroma/user.ex | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 3f729fdcc..c7753ca5d 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1037,16 +1037,6 @@ defp maybe_send_registration_email(%User{email: email} = user) when is_binary(em defp maybe_send_registration_email(_), do: {:ok, :noop} - def needs_update?(%User{local: true}), do: false - - def needs_update?(%User{local: false, last_refreshed_at: nil}), do: true - - def needs_update?(%User{local: false} = user) do - NaiveDateTime.diff(NaiveDateTime.utc_now(), user.last_refreshed_at) >= 86_400 - end - - def needs_update?(_), do: true - @spec maybe_direct_follow(User.t(), User.t()) :: {:ok, User.t()} | {:error, String.t()} # "Locked" (self-locked) users demand explicit authorization of follow requests @@ -2059,15 +2049,13 @@ def html_filter_policy(_), do: Config.get([:markup, :scrub_policy]) def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id) def get_or_fetch_by_ap_id(ap_id) do - cached_user = get_cached_by_ap_id(ap_id) + user = get_cached_by_ap_id(ap_id) || fetch_by_ap_id(ap_id) - maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id) - - case {cached_user, maybe_fetched_user} do - {_, {:ok, %User{} = user}} -> + case user do + %User{} = user -> {:ok, user} - {%User{} = user, _} -> + {:ok, %User{} = user} -> {:ok, user} _ -> From 1d417d2a364eb8ae2035275927f6284951fe128b Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 21:49:25 -0500 Subject: [PATCH 06/13] Our version of Oban only supports priorities 0-3 --- lib/pleroma/web/federator.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 1b8b22b7e..669a5bb0b 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -39,7 +39,7 @@ def incoming_ap_doc(%{params: params, req_headers: req_headers}) do ReceiverWorker.enqueue( "incoming_ap_doc", %{"req_headers" => req_headers, "params" => params, "timeout" => :timer.seconds(20)}, - priority: 5 + priority: 2 ) end From 4039106500ed0adba3ac6dbd480954aa76ce6bc3 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Fri, 8 Dec 2023 21:51:36 -0500 Subject: [PATCH 07/13] Fix the req_headers formatting --- lib/pleroma/workers/receiver_worker.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index b04e2974a..562112dd2 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -14,6 +14,10 @@ defmodule Pleroma.Workers.ReceiverWorker do def perform(%Job{ args: %{"op" => "incoming_ap_doc", "req_headers" => req_headers, "params" => params} }) do + # Oban's serialization converts our tuple headers to lists. + # Revert it for the signature validation. + req_headers = Enum.into(req_headers, [], &List.to_tuple(&1)) + conn_data = %{params: params, req_headers: req_headers} with {:ok, %User{} = _actor} <- User.get_or_fetch_by_ap_id(conn_data.params["actor"]), From 82724f6664c05e746eb241fb60aefd9a931b372d Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sat, 9 Dec 2023 17:43:54 -0500 Subject: [PATCH 08/13] Do not retry fetching deleted objects --- lib/pleroma/workers/receiver_worker.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 562112dd2..d4e8e715e 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -51,6 +51,7 @@ defp process_errors(errors) do {:error, {:error, {:validate, reason}}} -> {:cancel, reason} {:error, {:reject, reason}} -> {:cancel, reason} {:signature, false} -> {:error, :invalid_signature} + {:error, {:error, reason = "Object has been deleted"}} -> {:cancel, reason} e -> e end end From 94daa3e8c1ec42f40214dc13f061f4319764f715 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sat, 9 Dec 2023 18:21:00 -0500 Subject: [PATCH 09/13] Revert "Remove unnecessary forced refresh of user" This reverts commit 97cf78f63d312d0475ac8908d0b093cb5eff18d5. --- lib/pleroma/user.ex | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index c7753ca5d..3f729fdcc 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1037,6 +1037,16 @@ defp maybe_send_registration_email(%User{email: email} = user) when is_binary(em defp maybe_send_registration_email(_), do: {:ok, :noop} + def needs_update?(%User{local: true}), do: false + + def needs_update?(%User{local: false, last_refreshed_at: nil}), do: true + + def needs_update?(%User{local: false} = user) do + NaiveDateTime.diff(NaiveDateTime.utc_now(), user.last_refreshed_at) >= 86_400 + end + + def needs_update?(_), do: true + @spec maybe_direct_follow(User.t(), User.t()) :: {:ok, User.t()} | {:error, String.t()} # "Locked" (self-locked) users demand explicit authorization of follow requests @@ -2049,13 +2059,15 @@ def html_filter_policy(_), do: Config.get([:markup, :scrub_policy]) def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id) def get_or_fetch_by_ap_id(ap_id) do - user = get_cached_by_ap_id(ap_id) || fetch_by_ap_id(ap_id) + cached_user = get_cached_by_ap_id(ap_id) - case user do - %User{} = user -> + maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id) + + case {cached_user, maybe_fetched_user} do + {_, {:ok, %User{} = user}} -> {:ok, user} - {:ok, %User{} = user} -> + {%User{} = user, _} -> {:ok, user} _ -> From d417f7321801890381de4daa53c1a78ef4650d2c Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sat, 9 Dec 2023 18:40:29 -0500 Subject: [PATCH 10/13] Process inbound Delete activities at lowest priority --- lib/pleroma/web/federator.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/pleroma/web/federator.ex b/lib/pleroma/web/federator.ex index 669a5bb0b..8621d984c 100644 --- a/lib/pleroma/web/federator.ex +++ b/lib/pleroma/web/federator.ex @@ -43,6 +43,10 @@ def incoming_ap_doc(%{params: params, req_headers: req_headers}) do ) end + def incoming_ap_doc(%{"type" => "Delete"} = params) do + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}, priority: 3) + end + def incoming_ap_doc(params) do ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end From 223c1bac8dcbfd6b2454b79077545d3465688d7f Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sun, 10 Dec 2023 12:55:41 -0500 Subject: [PATCH 11/13] Cancel the job if the signature is still invalid after a refetch of the public key --- lib/pleroma/workers/receiver_worker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index d4e8e715e..1dddd8d2e 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -50,7 +50,7 @@ defp process_errors(errors) do {:error, {:validate_object, reason}} -> {:cancel, reason} {:error, {:error, {:validate, reason}}} -> {:cancel, reason} {:error, {:reject, reason}} -> {:cancel, reason} - {:signature, false} -> {:error, :invalid_signature} + {:signature, false} -> {:cancel, :invalid_signature} {:error, {:error, reason = "Object has been deleted"}} -> {:cancel, reason} e -> e end From 18deea59b441a89c1e6870d29dd0a6c0f3070f55 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sun, 10 Dec 2023 13:22:55 -0500 Subject: [PATCH 12/13] ActivityPub.make_user_from_ap_id/1 fetches the whole actor object including updating the public key for us --- lib/pleroma/signature.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/signature.ex b/lib/pleroma/signature.ex index 42cceba28..5cfdae051 100644 --- a/lib/pleroma/signature.ex +++ b/lib/pleroma/signature.ex @@ -58,7 +58,7 @@ def refetch_public_key(conn) do with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn), {:ok, actor_id} <- key_id_to_actor_id(kid), {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id), - {:ok, public_key} <- User.get_or_fetch_public_key_for_ap_id(actor_id) do + {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do {:ok, public_key} else e -> From c0a50b7c3e340cd621827922200daa0f29dc6e15 Mon Sep 17 00:00:00 2001 From: Mark Felder Date: Sun, 10 Dec 2023 13:24:04 -0500 Subject: [PATCH 13/13] User.get_or_fetch_public_key_for_ap_id/1 is no longer required. --- lib/pleroma/user.ex | 9 --------- test/pleroma/user_test.exs | 5 ----- 2 files changed, 14 deletions(-) diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 3f729fdcc..0706f5607 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -2135,15 +2135,6 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do def public_key(_), do: {:error, "key not found"} - def get_or_fetch_public_key_for_ap_id(ap_id) do - with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id), - {:ok, public_key} <- public_key(user) do - {:ok, public_key} - else - _ -> :error - end - end - def get_public_key_for_ap_id(ap_id) do with %User{} = user <- get_cached_by_ap_id(ap_id), {:ok, public_key} <- public_key(user) do diff --git a/test/pleroma/user_test.exs b/test/pleroma/user_test.exs index c0b576c3c..77ca9198b 100644 --- a/test/pleroma/user_test.exs +++ b/test/pleroma/user_test.exs @@ -1951,11 +1951,6 @@ test "unsuggests a user" do end end - test "get_or_fetch_public_key_for_ap_id fetches a user that's not in the db" do - assert {:ok, _key} = - User.get_or_fetch_public_key_for_ap_id("http://mastodon.example.org/users/admin") - end - test "get_public_key_for_ap_id returns correctly for user that's not in the db" do assert :error = User.get_public_key_for_ap_id("http://mastodon.example.org/users/admin") end