Merge branch 'handle_object_fetch_failures' into 'develop'
Handle object fetch failures gracefully See merge request pleroma/pleroma!4015
This commit is contained in:
commit
3c65a2899d
|
@ -0,0 +1 @@
|
|||
Remote object fetch failures will prevent the object fetch job from retrying if the object request returns 401, 403, 404, 410, or exceeds the maximum thread depth.
|
|
@ -177,7 +177,10 @@ def normalize(ap_id, options) when is_binary(ap_id) do
|
|||
ap_id
|
||||
|
||||
Keyword.get(options, :fetch) ->
|
||||
Fetcher.fetch_object_from_id!(ap_id, options)
|
||||
case Fetcher.fetch_object_from_id(ap_id, options) do
|
||||
{:ok, object} -> object
|
||||
_ -> nil
|
||||
end
|
||||
|
||||
true ->
|
||||
get_cached_by_ap_id(ap_id)
|
||||
|
|
|
@ -72,20 +72,25 @@ def fetch_object_from_id(id, options \\ []) do
|
|||
{:object, data, Object.normalize(activity, fetch: false)} do
|
||||
{:ok, object}
|
||||
else
|
||||
{:allowed_depth, false} ->
|
||||
{:error, "Max thread distance exceeded."}
|
||||
{:allowed_depth, false} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:error, :allowed_depth}
|
||||
|
||||
{:containment, _} ->
|
||||
{:error, "Object containment failed."}
|
||||
{:containment, reason} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:error, reason}
|
||||
|
||||
{:transmogrifier, {:error, {:reject, e}}} ->
|
||||
{:reject, e}
|
||||
{:transmogrifier, {:error, {:reject, reason}}} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:reject, reason}
|
||||
|
||||
{:transmogrifier, {:reject, e}} ->
|
||||
{:reject, e}
|
||||
{:transmogrifier, {:reject, reason}} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:reject, reason}
|
||||
|
||||
{:transmogrifier, _} = e ->
|
||||
{:error, e}
|
||||
{:transmogrifier, reason} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:error, reason}
|
||||
|
||||
{:object, data, nil} ->
|
||||
reinject_object(%Object{}, data)
|
||||
|
@ -96,14 +101,21 @@ def fetch_object_from_id(id, options \\ []) do
|
|||
{:fetch_object, %Object{} = object} ->
|
||||
{:ok, object}
|
||||
|
||||
{:fetch, {:error, error}} ->
|
||||
{:error, error}
|
||||
{:fetch, {:error, reason}} = e ->
|
||||
log_fetch_error(id, e)
|
||||
{:error, reason}
|
||||
|
||||
e ->
|
||||
e
|
||||
log_fetch_error(id, e)
|
||||
{:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
defp log_fetch_error(id, error) do
|
||||
Logger.metadata(object: id)
|
||||
Logger.error("Object rejected while fetching #{id} #{inspect(error)}")
|
||||
end
|
||||
|
||||
defp prepare_activity_params(data) do
|
||||
%{
|
||||
"type" => "Create",
|
||||
|
@ -117,26 +129,6 @@ defp prepare_activity_params(data) do
|
|||
|> Maps.put_if_present("bcc", data["bcc"])
|
||||
end
|
||||
|
||||
def fetch_object_from_id!(id, options \\ []) do
|
||||
with {:ok, object} <- fetch_object_from_id(id, options) do
|
||||
object
|
||||
else
|
||||
{:error, %Tesla.Mock.Error{}} ->
|
||||
nil
|
||||
|
||||
{:error, "Object has been deleted"} ->
|
||||
nil
|
||||
|
||||
{:reject, reason} ->
|
||||
Logger.info("Rejected #{id} while fetching: #{inspect(reason)}")
|
||||
nil
|
||||
|
||||
e ->
|
||||
Logger.error("Error while fetching #{id}: #{inspect(e)}")
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
defp make_signature(id, date) do
|
||||
uri = URI.parse(id)
|
||||
|
||||
|
@ -227,8 +219,11 @@ defp get_object(id) do
|
|||
{:error, {:content_type, nil}}
|
||||
end
|
||||
|
||||
{:ok, %{status: code}} when code in [401, 403] ->
|
||||
{:error, :forbidden}
|
||||
|
||||
{:ok, %{status: code}} when code in [404, 410] ->
|
||||
{:error, "Object has been deleted"}
|
||||
{:error, :not_found}
|
||||
|
||||
{:error, e} ->
|
||||
{:error, e}
|
||||
|
|
|
@ -1698,9 +1698,7 @@ defp collection_private(%{"first" => first}) do
|
|||
Fetcher.fetch_and_contain_remote_object_from_id(first) do
|
||||
{:ok, false}
|
||||
else
|
||||
{:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true}
|
||||
{:error, _} = e -> e
|
||||
e -> {:error, e}
|
||||
{:error, _} -> {:ok, true}
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
|
|||
|
||||
import Ecto.Query
|
||||
|
||||
require Logger
|
||||
require Pleroma.Constants
|
||||
|
||||
@doc """
|
||||
|
@ -155,8 +154,7 @@ def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options)
|
|||
|> Map.put("context", replied_object.data["context"] || object["conversation"])
|
||||
|> Map.drop(["conversation", "inReplyToAtomUri"])
|
||||
else
|
||||
e ->
|
||||
Logger.warning("Couldn't fetch #{inspect(in_reply_to_id)}, error: #{inspect(e)}")
|
||||
_ ->
|
||||
object
|
||||
end
|
||||
else
|
||||
|
@ -181,8 +179,7 @@ def fix_quote_url_and_maybe_fetch(object, options \\ []) do
|
|||
{:quoting?, _} ->
|
||||
object
|
||||
|
||||
e ->
|
||||
Logger.warning("Couldn't fetch #{inspect(quote_url)}, error: #{inspect(e)}")
|
||||
_ ->
|
||||
object
|
||||
end
|
||||
end
|
||||
|
@ -856,8 +853,7 @@ def maybe_fix_object_url(%{"object" => object} = data) when is_binary(object) do
|
|||
relative_object do
|
||||
Map.put(data, "object", external_url)
|
||||
else
|
||||
{:fetch, e} ->
|
||||
Logger.error("Couldn't fetch #{object} #{inspect(e)}")
|
||||
{:fetch, _} ->
|
||||
data
|
||||
|
||||
_ ->
|
||||
|
|
|
@ -9,7 +9,22 @@ defmodule Pleroma.Workers.RemoteFetcherWorker do
|
|||
|
||||
@impl Oban.Worker
|
||||
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||
{:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])
|
||||
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
||||
{:ok, _object} ->
|
||||
:ok
|
||||
|
||||
{:error, :forbidden} ->
|
||||
{:discard, :forbidden}
|
||||
|
||||
{:error, :not_found} ->
|
||||
{:discard, :not_found}
|
||||
|
||||
{:error, :allowed_depth} ->
|
||||
{:discard, :allowed_depth}
|
||||
|
||||
_ ->
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
@impl Oban.Worker
|
||||
|
|
|
@ -101,8 +101,7 @@ test "it works when fetching the OP actor errors out" do
|
|||
test "it returns thread depth exceeded error if thread depth is exceeded" do
|
||||
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
||||
|
||||
assert {:error, "Max thread distance exceeded."} =
|
||||
Fetcher.fetch_object_from_id(@ap_id, depth: 1)
|
||||
assert {:error, :allowed_depth} = Fetcher.fetch_object_from_id(@ap_id, depth: 1)
|
||||
end
|
||||
|
||||
test "it fetches object if max thread depth is restricted to 0 and depth is not specified" do
|
||||
|
@ -220,14 +219,14 @@ test "all objects with fake directions are rejected by the object fetcher" do
|
|||
end
|
||||
|
||||
test "handle HTTP 410 Gone response" do
|
||||
assert {:error, "Object has been deleted"} ==
|
||||
assert {:error, :not_found} ==
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(
|
||||
"https://mastodon.example.org/users/userisgone"
|
||||
)
|
||||
end
|
||||
|
||||
test "handle HTTP 404 response" do
|
||||
assert {:error, "Object has been deleted"} ==
|
||||
assert {:error, :not_found} ==
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(
|
||||
"https://mastodon.example.org/users/userisgone404"
|
||||
)
|
||||
|
|
|
@ -132,7 +132,7 @@ test "it keeps link tags" do
|
|||
assert {:ok, activity} = Transmogrifier.handle_incoming(message)
|
||||
object = Object.normalize(activity)
|
||||
assert [%{"type" => "Mention"}, %{"type" => "Link"}] = object.data["tag"]
|
||||
end) =~ "Error while fetching"
|
||||
end) =~ "Object rejected while fetching"
|
||||
end
|
||||
|
||||
test "it accepts quote posts" do
|
||||
|
@ -410,7 +410,7 @@ test "it rejects activities which reference objects with bogus origins" do
|
|||
|
||||
assert capture_log(fn ->
|
||||
{:error, _} = Transmogrifier.handle_incoming(data)
|
||||
end) =~ "Object containment failed"
|
||||
end) =~ "Object rejected while fetching"
|
||||
end
|
||||
|
||||
test "it rejects activities which reference objects that have an incorrect attribution (variant 1)" do
|
||||
|
@ -425,7 +425,7 @@ test "it rejects activities which reference objects that have an incorrect attri
|
|||
|
||||
assert capture_log(fn ->
|
||||
{:error, _} = Transmogrifier.handle_incoming(data)
|
||||
end) =~ "Object containment failed"
|
||||
end) =~ "Object rejected while fetching"
|
||||
end
|
||||
|
||||
test "it rejects activities which reference objects that have an incorrect attribution (variant 2)" do
|
||||
|
@ -440,7 +440,7 @@ test "it rejects activities which reference objects that have an incorrect attri
|
|||
|
||||
assert capture_log(fn ->
|
||||
{:error, _} = Transmogrifier.handle_incoming(data)
|
||||
end) =~ "Object containment failed"
|
||||
end) =~ "Object rejected while fetching"
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ test "show follow page with error when user can not be fetched by `acct` link",
|
|||
|> html_response(200)
|
||||
|
||||
assert response =~ "Error fetching user"
|
||||
end) =~ "Object has been deleted"
|
||||
end) =~ ":not_found"
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2023 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Workers.RemoteFetcherWorkerTest do
|
||||
use Pleroma.DataCase
|
||||
use Oban.Testing, repo: Pleroma.Repo
|
||||
|
||||
alias Pleroma.Workers.RemoteFetcherWorker
|
||||
|
||||
@deleted_object_one "https://deleted-404.example.com/"
|
||||
@deleted_object_two "https://deleted-410.example.com/"
|
||||
@unauthorized_object "https://unauthorized.example.com/"
|
||||
@depth_object "https://depth.example.com/"
|
||||
|
||||
describe "RemoteFetcherWorker" do
|
||||
setup do
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :get, url: @deleted_object_one} ->
|
||||
%Tesla.Env{
|
||||
status: 404
|
||||
}
|
||||
|
||||
%{method: :get, url: @deleted_object_two} ->
|
||||
%Tesla.Env{
|
||||
status: 410
|
||||
}
|
||||
|
||||
%{method: :get, url: @unauthorized_object} ->
|
||||
%Tesla.Env{
|
||||
status: 403
|
||||
}
|
||||
|
||||
%{method: :get, url: @depth_object} ->
|
||||
%Tesla.Env{
|
||||
status: 200
|
||||
}
|
||||
end)
|
||||
end
|
||||
|
||||
test "does not requeue a deleted object" do
|
||||
assert {:discard, _} =
|
||||
RemoteFetcherWorker.perform(%Oban.Job{
|
||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_one}
|
||||
})
|
||||
|
||||
assert {:discard, _} =
|
||||
RemoteFetcherWorker.perform(%Oban.Job{
|
||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_two}
|
||||
})
|
||||
end
|
||||
|
||||
test "does not requeue an unauthorized object" do
|
||||
assert {:discard, _} =
|
||||
RemoteFetcherWorker.perform(%Oban.Job{
|
||||
args: %{"op" => "fetch_remote", "id" => @unauthorized_object}
|
||||
})
|
||||
end
|
||||
|
||||
test "does not requeue an object that exceeded depth" do
|
||||
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
||||
|
||||
assert {:discard, _} =
|
||||
RemoteFetcherWorker.perform(%Oban.Job{
|
||||
args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1}
|
||||
})
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue