[#534] Initial implementation of unreachable federation targets retirement.
This commit is contained in:
parent
4a278cd80a
commit
f161a92cb1
|
@ -0,0 +1,12 @@
|
||||||
|
defmodule Pleroma.Instances do
|
||||||
|
@moduledoc "Instances context."
|
||||||
|
|
||||||
|
@adapter Pleroma.Instances.Instance
|
||||||
|
|
||||||
|
defdelegate reachable?(url), to: @adapter
|
||||||
|
defdelegate set_reachable(url), to: @adapter
|
||||||
|
defdelegate set_unreachable(url, unreachable_since \\ nil), to: @adapter
|
||||||
|
|
||||||
|
def reachability_time_threshold,
|
||||||
|
do: NaiveDateTime.add(NaiveDateTime.utc_now(), -30 * 24 * 3600, :second)
|
||||||
|
end
|
|
@ -0,0 +1,77 @@
|
||||||
|
defmodule Pleroma.Instances.Instance do
|
||||||
|
@moduledoc "Instance."
|
||||||
|
|
||||||
|
alias Pleroma.Instances
|
||||||
|
alias Pleroma.Instances.Instance
|
||||||
|
|
||||||
|
use Ecto.Schema
|
||||||
|
|
||||||
|
import Ecto.{Query, Changeset}
|
||||||
|
|
||||||
|
alias Pleroma.Repo
|
||||||
|
|
||||||
|
schema "instances" do
|
||||||
|
field(:host, :string)
|
||||||
|
field(:unreachable_since, :naive_datetime)
|
||||||
|
field(:reachability_checked_at, :naive_datetime)
|
||||||
|
|
||||||
|
timestamps()
|
||||||
|
end
|
||||||
|
|
||||||
|
def update_changeset(struct, params \\ %{}) do
|
||||||
|
struct
|
||||||
|
|> cast(params, [:host, :unreachable_since, :reachability_checked_at])
|
||||||
|
|> unique_constraint(:host)
|
||||||
|
end
|
||||||
|
|
||||||
|
def reachable?(url) do
|
||||||
|
!Repo.one(
|
||||||
|
from(i in Instance,
|
||||||
|
where:
|
||||||
|
i.host == ^host(url) and i.unreachable_since <= ^Instances.reachability_time_threshold(),
|
||||||
|
select: true
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_reachable(url) do
|
||||||
|
Repo.update_all(
|
||||||
|
from(i in Instance, where: i.host == ^host(url)),
|
||||||
|
set: [
|
||||||
|
unreachable_since: nil,
|
||||||
|
reachability_checked_at: DateTime.utc_now()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_unreachable(url, unreachable_since \\ nil) do
|
||||||
|
unreachable_since = unreachable_since || DateTime.utc_now()
|
||||||
|
host = host(url)
|
||||||
|
existing_record = Repo.get_by(Instance, %{host: host})
|
||||||
|
|
||||||
|
changes = %{
|
||||||
|
unreachable_since: unreachable_since,
|
||||||
|
reachability_checked_at: NaiveDateTime.utc_now()
|
||||||
|
}
|
||||||
|
|
||||||
|
if existing_record do
|
||||||
|
update_changes =
|
||||||
|
if existing_record.unreachable_since &&
|
||||||
|
NaiveDateTime.compare(existing_record.unreachable_since, unreachable_since) != :gt,
|
||||||
|
do: Map.delete(changes, :unreachable_since),
|
||||||
|
else: changes
|
||||||
|
|
||||||
|
{:ok, _instance} = Repo.update(update_changeset(existing_record, update_changes))
|
||||||
|
else
|
||||||
|
{:ok, _instance} = Repo.insert(update_changeset(%Instance{}, Map.put(changes, :host, host)))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp host(url_or_host) do
|
||||||
|
if url_or_host =~ ~r/^http/i do
|
||||||
|
URI.parse(url_or_host).host
|
||||||
|
else
|
||||||
|
url_or_host
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -3,7 +3,7 @@
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
||||||
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
|
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification, Instances}
|
||||||
alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
|
alias Pleroma.Web.ActivityPub.{Transmogrifier, MRF}
|
||||||
alias Pleroma.Web.WebFinger
|
alias Pleroma.Web.WebFinger
|
||||||
alias Pleroma.Web.Federator
|
alias Pleroma.Web.Federator
|
||||||
|
@ -721,7 +721,15 @@ def publish(actor, activity) do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
def publish_one(%{inbox: inbox} = activity) do
|
||||||
|
if Instances.reachable?(inbox) do
|
||||||
|
do_publish_one(activity)
|
||||||
|
else
|
||||||
|
{:error, :noop}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
||||||
Logger.info("Federating #{id} to #{inbox}")
|
Logger.info("Federating #{id} to #{inbox}")
|
||||||
host = URI.parse(inbox).host
|
host = URI.parse(inbox).host
|
||||||
|
|
||||||
|
@ -734,6 +742,8 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
||||||
digest: digest
|
digest: digest
|
||||||
})
|
})
|
||||||
|
|
||||||
|
with {:ok, _} <-
|
||||||
|
result =
|
||||||
@httpoison.post(
|
@httpoison.post(
|
||||||
inbox,
|
inbox,
|
||||||
json,
|
json,
|
||||||
|
@ -742,7 +752,14 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
|
||||||
{"signature", signature},
|
{"signature", signature},
|
||||||
{"digest", digest}
|
{"digest", digest}
|
||||||
]
|
]
|
||||||
)
|
) do
|
||||||
|
Instances.set_reachable(inbox)
|
||||||
|
result
|
||||||
|
else
|
||||||
|
e ->
|
||||||
|
Instances.set_unreachable(inbox)
|
||||||
|
e
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
|
|
|
@ -6,6 +6,7 @@ defmodule Pleroma.Web.Salmon do
|
||||||
@httpoison Application.get_env(:pleroma, :httpoison)
|
@httpoison Application.get_env(:pleroma, :httpoison)
|
||||||
|
|
||||||
use Bitwise
|
use Bitwise
|
||||||
|
alias Pleroma.Instances
|
||||||
alias Pleroma.Web.XML
|
alias Pleroma.Web.XML
|
||||||
alias Pleroma.Web.OStatus.ActivityRepresenter
|
alias Pleroma.Web.OStatus.ActivityRepresenter
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
|
@ -167,15 +168,23 @@ defp send_to_user(%{info: %{salmon: salmon}}, feed, poster),
|
||||||
do: send_to_user(salmon, feed, poster)
|
do: send_to_user(salmon, feed, poster)
|
||||||
|
|
||||||
defp send_to_user(url, feed, poster) when is_binary(url) do
|
defp send_to_user(url, feed, poster) when is_binary(url) do
|
||||||
with {:ok, %{status: code}} <-
|
with {:reachable, true} <- {:reachable, Instances.reachable?(url)},
|
||||||
|
{:ok, %{status: code}} <-
|
||||||
poster.(
|
poster.(
|
||||||
url,
|
url,
|
||||||
feed,
|
feed,
|
||||||
[{"Content-Type", "application/magic-envelope+xml"}]
|
[{"Content-Type", "application/magic-envelope+xml"}]
|
||||||
) do
|
) do
|
||||||
|
Instances.set_reachable(url)
|
||||||
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
|
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
|
||||||
else
|
else
|
||||||
e -> Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
|
{:reachable, false} ->
|
||||||
|
Logger.debug(fn -> "Pushing Salmon to #{url} skipped as marked unreachable)" end)
|
||||||
|
:noop
|
||||||
|
|
||||||
|
e ->
|
||||||
|
Instances.set_unreachable(url)
|
||||||
|
Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
defmodule Pleroma.Web.Websub do
|
defmodule Pleroma.Web.Websub do
|
||||||
alias Ecto.Changeset
|
alias Ecto.Changeset
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
|
alias Pleroma.Instances
|
||||||
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
|
alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription}
|
||||||
alias Pleroma.Web.OStatus.FeedRepresenter
|
alias Pleroma.Web.OStatus.FeedRepresenter
|
||||||
alias Pleroma.Web.{XML, Endpoint, OStatus}
|
alias Pleroma.Web.{XML, Endpoint, OStatus}
|
||||||
|
@ -267,7 +268,8 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d
|
||||||
signature = sign(secret || "", xml)
|
signature = sign(secret || "", xml)
|
||||||
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
|
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
|
||||||
|
|
||||||
with {:ok, %{status: code}} <-
|
with {:reachable, true} <- {:reachable, Instances.reachable?(callback)},
|
||||||
|
{:ok, %{status: code}} <-
|
||||||
@httpoison.post(
|
@httpoison.post(
|
||||||
callback,
|
callback,
|
||||||
xml,
|
xml,
|
||||||
|
@ -276,10 +278,16 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d
|
||||||
{"X-Hub-Signature", "sha1=#{signature}"}
|
{"X-Hub-Signature", "sha1=#{signature}"}
|
||||||
]
|
]
|
||||||
) do
|
) do
|
||||||
|
Instances.set_reachable(callback)
|
||||||
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
|
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
|
||||||
{:ok, code}
|
{:ok, code}
|
||||||
else
|
else
|
||||||
|
{:reachable, false} ->
|
||||||
|
Logger.debug(fn -> "Pushing to #{callback} skipped as marked unreachable)" end)
|
||||||
|
{:error, :noop}
|
||||||
|
|
||||||
e ->
|
e ->
|
||||||
|
Instances.set_unreachable(callback)
|
||||||
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
|
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
|
||||||
{:error, e}
|
{:error, e}
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.CreateInstances do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create table(:instances) do
|
||||||
|
add :host, :string
|
||||||
|
add :unreachable_since, :naive_datetime
|
||||||
|
add :reachability_checked_at, :naive_datetime
|
||||||
|
|
||||||
|
timestamps()
|
||||||
|
end
|
||||||
|
|
||||||
|
create unique_index(:instances, [:host])
|
||||||
|
create index(:instances, [:unreachable_since])
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue