186 lines
6.3 KiB
Elixir
186 lines
6.3 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Web.FedSockets do
|
|
@moduledoc """
|
|
This documents the FedSockets framework. A framework for federating
|
|
ActivityPub objects between servers via persistant WebSocket connections.
|
|
|
|
FedSockets allow servers to authenticate on first contact and maintain that
|
|
connection, eliminating the need to authenticate every time data needs to be shared.
|
|
|
|
## Protocol
|
|
FedSockets currently support 2 types of data transfer:
|
|
* `publish` method which doesn't require a response
|
|
* `fetch` method requires a response be sent
|
|
|
|
### Publish
|
|
The publish operation sends a json encoded map of the shape:
|
|
%{action: :publish, data: json}
|
|
and accepts (but does not require) a reply of form:
|
|
%{"action" => "publish_reply"}
|
|
|
|
The outgoing params represent
|
|
* data: ActivityPub object encoded into json
|
|
|
|
|
|
### Fetch
|
|
The fetch operation sends a json encoded map of the shape:
|
|
%{action: :fetch, data: id, uuid: fetch_uuid}
|
|
and requires a reply of form:
|
|
%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
|
|
|
|
The outgoing params represent
|
|
* id: an ActivityPub object URI
|
|
* uuid: a unique uuid generated by the sender
|
|
|
|
The reply params represent
|
|
* data: an ActivityPub object encoded into json
|
|
* uuid: the uuid sent along with the fetch request
|
|
|
|
## Examples
|
|
Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
|
|
|
|
A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
|
|
|
|
case FedSockets.get_or_create_fed_socket(inbox) do
|
|
{:ok, fedsocket} ->
|
|
FedSockets.publish(fedsocket, json)
|
|
|
|
_ ->
|
|
alternative_publish(inbox, actor, json, params)
|
|
end
|
|
|
|
## Configuration
|
|
FedSockets have the following config settings
|
|
|
|
config :pleroma, :fed_sockets,
|
|
enabled: true,
|
|
ping_interval: :timer.seconds(15),
|
|
connection_duration: :timer.hours(1),
|
|
rejection_duration: :timer.hours(1),
|
|
fed_socket_fetches: [
|
|
default: 12_000,
|
|
interval: 3_000,
|
|
lazy: false
|
|
]
|
|
* enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
|
|
* connection_duration - How long a FedSocket can sit idle before it's culled.
|
|
* rejection_duration - After failing to make a FedSocket connection a host will be excluded
|
|
from further connections for this amount of time
|
|
* fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
|
|
* fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
|
|
|
|
Cachex options are
|
|
* default: the minimum amount of time a fetch can wait before it times out.
|
|
* interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
|
|
* lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
|
|
|
|
"""
|
|
require Logger
|
|
|
|
alias Pleroma.Web.FedSockets.FedRegistry
|
|
alias Pleroma.Web.FedSockets.FedSocket
|
|
alias Pleroma.Web.FedSockets.SocketInfo
|
|
|
|
@doc """
|
|
returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
|
|
|
|
address is expected to be a fully formed URL such as:
|
|
"http://www.example.com" or "http://www.example.com:8080"
|
|
|
|
It can and usually does include additional path parameters,
|
|
but these are ignored as the FedSockets are organized by host and port info alone.
|
|
"""
|
|
def get_or_create_fed_socket(address) do
|
|
with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
|
|
{:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
|
|
{:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
|
|
Logger.debug("fedsocket created for - #{inspect(address)}")
|
|
{:ok, fed_socket}
|
|
else
|
|
{:cache, {:ok, socket}} ->
|
|
Logger.debug("fedsocket found in cache - #{inspect(address)}")
|
|
{:ok, socket}
|
|
|
|
{:cache, {:error, :rejected} = e} ->
|
|
e
|
|
|
|
{:connect, {:error, _host}} ->
|
|
Logger.debug("set host rejected for - #{inspect(address)}")
|
|
FedRegistry.set_host_rejected(address)
|
|
{:error, :rejected}
|
|
|
|
{_, {:error, :disabled}} ->
|
|
{:error, :disabled}
|
|
|
|
{_, {:error, reason}} ->
|
|
Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
|
|
|
|
address is expected to be a fully formed URL such as:
|
|
"http://www.example.com" or "http://www.example.com:8080"
|
|
"""
|
|
def get_fed_socket(address) do
|
|
origin = SocketInfo.origin(address)
|
|
|
|
with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
|
|
{:ok, socket} <- FedRegistry.get_fed_socket(origin) do
|
|
{:ok, socket}
|
|
else
|
|
{:config, _} ->
|
|
{:error, :disabled}
|
|
|
|
{:error, :rejected} ->
|
|
Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
|
|
{:error, :rejected}
|
|
|
|
{:error, reason} ->
|
|
{:error, reason}
|
|
end
|
|
end
|
|
|
|
@doc """
|
|
Sends the supplied data via the publish protocol.
|
|
It will not block waiting for a reply.
|
|
Returns :ok but this is not an indication of a successful transfer.
|
|
|
|
the data is expected to be JSON encoded binary data.
|
|
"""
|
|
def publish(%SocketInfo{} = fed_socket, json) do
|
|
FedSocket.publish(fed_socket, json)
|
|
end
|
|
|
|
@doc """
|
|
Sends the supplied data via the fetch protocol.
|
|
It will block waiting for a reply or timeout.
|
|
|
|
Returns {:ok, object} where object is the requested object (or nil)
|
|
{:error, :timeout} in the event the message was not responded to
|
|
|
|
the id is expected to be the URI of an ActivityPub object.
|
|
"""
|
|
def fetch(%SocketInfo{} = fed_socket, id) do
|
|
FedSocket.fetch(fed_socket, id)
|
|
end
|
|
|
|
@doc """
|
|
Disconnect all and restart FedSockets.
|
|
This is mainly used in development and testing but could be useful in production.
|
|
"""
|
|
def reset do
|
|
FedRegistry
|
|
|> Process.whereis()
|
|
|> Process.exit(:testing)
|
|
end
|
|
|
|
def uri_for_origin(origin),
|
|
do: "ws://#{origin}/api/fedsocket/v1"
|
|
end
|