sending notes works now
This commit is contained in:
parent
8257c07d26
commit
1059718f3f
|
@ -0,0 +1,56 @@
|
|||
defmodule Vonbraun.ActivityPub.Deliverator do
|
||||
alias Vonbraun.Util
|
||||
alias Vonbraun.Ecto.Schema.Actor
|
||||
alias Vonbraun.ActivityPubReq
|
||||
|
||||
require Logger
|
||||
|
||||
# FIXME make actually queue.
|
||||
@spec queue(map()) :: {:ok, pid()}
|
||||
def queue(activity = %{}), do: Task.start(fn -> send(activity) end)
|
||||
|
||||
def send(activity = %{}) do
|
||||
inboxes = get_all_inboxes(activity)
|
||||
|
||||
Logger.debug("Inboxes to send to: #{inspect(inboxes)}")
|
||||
|
||||
inboxes = Enum.map(inboxes, &URI.parse/1)
|
||||
|
||||
payload = Jason.encode!(activity)
|
||||
|
||||
inboxes
|
||||
|> Task.async_stream(
|
||||
fn inbox ->
|
||||
ActivityPubReq.post(inbox, payload)
|
||||
end,
|
||||
timeout: 60_000
|
||||
)
|
||||
|> Enum.to_list()
|
||||
end
|
||||
|
||||
defp all_recipients(activity = %{}) do
|
||||
Enum.reduce(["to", "cc", "bcc", "bto"], [], fn
|
||||
target, all ->
|
||||
case Map.get(activity, target, []) do
|
||||
recip_list when is_list(recip_list) -> all ++ recip_list
|
||||
recip when is_binary(recip) -> [recip | all]
|
||||
end
|
||||
end)
|
||||
|> Enum.sort()
|
||||
|> Enum.uniq()
|
||||
end
|
||||
|
||||
defp get_all_inboxes(activity = %{}) do
|
||||
recipients = all_recipients(activity)
|
||||
my_followers_id = Util.my_id() <> "/followers"
|
||||
|
||||
(if my_followers_id in recipients do
|
||||
Actor.get_my_followers_inboxes()
|
||||
else
|
||||
[]
|
||||
end ++
|
||||
Actor.get_recipient_inboxes(recipients))
|
||||
|> Enum.sort()
|
||||
|> Enum.uniq()
|
||||
end
|
||||
end
|
|
@ -47,8 +47,10 @@ defmodule Vonbraun.ActivityPub.Handler.Follow do
|
|||
Logger.debug("Replying to follow request with: #{activity_type}")
|
||||
Logger.debug("And payload: `#{payload}`")
|
||||
|
||||
with {:inbox, {:ok, inbox}} <- {:inbox, ActivityPubReq.extract_actor_inbox(actor)},
|
||||
{:inbox_uri, inbox = %URI{}} <- {:inbox_uri, URI.parse(inbox)} do
|
||||
with {:inbox, {raw_shared_inbox, raw_inbox}} <-
|
||||
{:inbox, ActivityPubReq.extract_actor_inboxes(actor)},
|
||||
{:inbox_uri, inbox = %URI{}} <-
|
||||
{:inbox_uri, URI.parse(raw_shared_inbox || raw_inbox)} do
|
||||
Task.start(fn ->
|
||||
case ActivityPubReq.post(inbox, payload) do
|
||||
{:ok, %{:status => status, :body => body}} ->
|
||||
|
@ -61,7 +63,7 @@ defmodule Vonbraun.ActivityPub.Handler.Follow do
|
|||
|
||||
{:ok, String.to_atom(follows_me_state)}
|
||||
else
|
||||
{:inbox, {:error, _}} ->
|
||||
{:inbox, :not_found} ->
|
||||
{:error, :inbox}
|
||||
end
|
||||
else
|
||||
|
|
|
@ -86,8 +86,25 @@ defmodule Vonbraun.ActivityPub.Object do
|
|||
if to_public? || cc_public? do
|
||||
object
|
||||
else
|
||||
to = [@public_to | listify(to)]
|
||||
Map.put(object, "to", to)
|
||||
cc = [@public_to | listify(cc)]
|
||||
Map.put(object, "cc", cc)
|
||||
end
|
||||
end
|
||||
|
||||
def mark_to_followers(object = %{}) do
|
||||
followers_id = my_id() <> "/followers"
|
||||
|
||||
to = Map.get(object, "to", [])
|
||||
cc = Map.get(object, "cc", [])
|
||||
|
||||
to_followers? = to == followers_id or (is_list(to) && followers_id in to)
|
||||
cc_followers? = cc == followers_id or (is_list(cc) && followers_id in cc)
|
||||
|
||||
if to_followers? || cc_followers? do
|
||||
object
|
||||
else
|
||||
cc = [followers_id | listify(cc)]
|
||||
Map.put(object, "cc", cc)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ defmodule Vonbraun.ActivityPubReq do
|
|||
alias Vonbraun.Cache
|
||||
alias Vonbraun.Util
|
||||
alias Vonbraun.HTTPSignature
|
||||
alias Vonbraun.Ecto.Schema.Actor
|
||||
|
||||
@ttl :timer.minutes(1)
|
||||
|
||||
|
@ -65,27 +66,52 @@ defmodule Vonbraun.ActivityPubReq do
|
|||
|
||||
def get_cached_actor(id = "https://" <> _) when is_binary(id) do
|
||||
with {:cache, nil} <- {:cache, Cache.get(id)},
|
||||
{:actor, {:ok, actor = %{}}} <- {:actor, get_actor(id)} do
|
||||
{:actor, {:ok, actor = %{}}} <- {:actor, get_actor(id)},
|
||||
{:props, {:ok, actor_props}} <- {:props, Actor.maybe_insert(id)},
|
||||
{:update, {:ok, _}} <- {:update, maybe_update_inboxes(actor_props, actor)} do
|
||||
Cache.put(id, actor, ttl: @ttl)
|
||||
{:ok, actor}
|
||||
else
|
||||
{:cache, actor = %{}} ->
|
||||
{:ok, actor}
|
||||
|
||||
{:actor, {:error, error}} ->
|
||||
{:error, error}
|
||||
{:actor, error = {:error, _}} ->
|
||||
error
|
||||
|
||||
{:props, error = {:error, _}} ->
|
||||
error
|
||||
|
||||
{:update, error = {:error, _}} ->
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
def extract_actor_inbox(%{"inbox" => inbox}) when is_binary(inbox) do
|
||||
{:ok, inbox}
|
||||
# TODO: convert to JSON-LD expanded form.
|
||||
|
||||
def extract_actor_inboxes(%{"inbox" => inbox, "endpoints" => %{"sharedInbox" => shared_inbox}})
|
||||
when is_binary(inbox) and is_binary(shared_inbox) do
|
||||
{shared_inbox, inbox}
|
||||
end
|
||||
|
||||
def extract_actor_inbox(%{"endpoints" => %{"sharedInbox" => inbox}}) when is_binary(inbox) do
|
||||
{:ok, inbox}
|
||||
def extract_actor_inboxes(%{"endpoints" => %{"sharedInbox" => shared_inbox}})
|
||||
when is_binary(shared_inbox) do
|
||||
{shared_inbox, nil}
|
||||
end
|
||||
|
||||
def extract_actor_inbox(%{}) do
|
||||
{:error, :not_found}
|
||||
def extract_actor_inboxes(%{"inbox" => inbox}) when is_binary(inbox) do
|
||||
{nil, inbox}
|
||||
end
|
||||
|
||||
def extract_actor_inboxes(%{}) do
|
||||
:not_found
|
||||
end
|
||||
|
||||
defp maybe_update_inboxes(actor_props = %Actor{}, actor = %{}) do
|
||||
with false <- Actor.has_inboxes?(actor_props),
|
||||
{new_shared_inbox, new_inbox} <- extract_actor_inboxes(actor) do
|
||||
Actor.set_inboxes(actor_props, new_shared_inbox, new_inbox)
|
||||
else
|
||||
_ -> {:ok, actor_props}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,6 +4,8 @@ defmodule Vonbraun.Control do
|
|||
alias Vonbraun.ActivityPubReq
|
||||
alias Vonbraun.Ecto.Schema.Actor
|
||||
alias Vonbraun.CryptoID
|
||||
alias Vonbraun.ActivityPub.Deliverator
|
||||
alias Vonbraun.Util
|
||||
|
||||
@spec follow(String.t()) ::
|
||||
:ok
|
||||
|
@ -15,8 +17,10 @@ defmodule Vonbraun.Control do
|
|||
| {:status, non_neg_integer()}}
|
||||
def follow(followee_actor_id) when is_binary(followee_actor_id) do
|
||||
with {:actor, {:ok, actor}} <- {:actor, ActivityPubReq.get_cached_actor(followee_actor_id)},
|
||||
{:inbox, {:ok, raw_inbox}} <- {:inbox, ActivityPubReq.extract_actor_inbox(actor)},
|
||||
{:parse_inbox, inbox = %URI{}} <- {:parse_inbox, URI.parse(raw_inbox)},
|
||||
{:inbox, {raw_shared_inbox, raw_inbox}} <-
|
||||
{:inbox, ActivityPubReq.extract_actor_inboxes(actor)},
|
||||
{:parse_inbox, inbox = %URI{}} <-
|
||||
{:parse_inbox, URI.parse(raw_shared_inbox || raw_inbox)},
|
||||
{:pending, {:ok, %{:blocks_me => nil, :following_state => "pending"}}} <-
|
||||
{:pending, Actor.mark_pending_follow(followee_actor_id, "pending")} do
|
||||
payload = Object.follow_activity(followee_actor_id) |> Jason.encode!()
|
||||
|
@ -53,7 +57,7 @@ defmodule Vonbraun.Control do
|
|||
Logger.info("This user blocks you.")
|
||||
{:error, :blocked}
|
||||
|
||||
{:inbox, {:error, _}} ->
|
||||
{:inbox, :not_found} ->
|
||||
Logger.warning("Actor had an invalid inbox.")
|
||||
{:error, :invalid_inbox}
|
||||
end
|
||||
|
@ -61,8 +65,10 @@ defmodule Vonbraun.Control do
|
|||
|
||||
def unfollow(followee_actor_id) when is_binary(followee_actor_id) do
|
||||
with {:actor, {:ok, actor}} <- {:actor, ActivityPubReq.get_cached_actor(followee_actor_id)},
|
||||
{:inbox, {:ok, raw_inbox}} <- {:inbox, ActivityPubReq.extract_actor_inbox(actor)},
|
||||
{:parse_inbox, inbox = %URI{}} <- {:parse_inbox, URI.parse(raw_inbox)},
|
||||
{:inbox, {raw_shared_inbox, raw_inbox}} <-
|
||||
{:inbox, ActivityPubReq.extract_actor_inboxes(actor)},
|
||||
{:parse_inbox, inbox = %URI{}} <-
|
||||
{:parse_inbox, URI.parse(raw_shared_inbox || raw_inbox)},
|
||||
{:state, {:ok, %Actor{}}} <-
|
||||
{:state, Actor.remove_followee(followee_actor_id)} do
|
||||
object = Object.follow_activity(followee_actor_id)
|
||||
|
@ -86,7 +92,7 @@ defmodule Vonbraun.Control do
|
|||
{:error, {:post, error}}
|
||||
end
|
||||
else
|
||||
{:inbox, {:error, _}} ->
|
||||
{:inbox, :not_found} ->
|
||||
Logger.warning("Actor had an invalid inbox.")
|
||||
{:error, :invalid_inbox}
|
||||
|
||||
|
@ -104,18 +110,24 @@ defmodule Vonbraun.Control do
|
|||
end
|
||||
end
|
||||
|
||||
def post_note(content, public?, to \\ [])
|
||||
when is_binary(content) and (is_list(to) or is_binary(to)) and is_boolean(public?) do
|
||||
def post_note(content, public?, to_followers?, to \\ [])
|
||||
when is_binary(content) and (is_list(to) or is_binary(to)) and is_boolean(public?) and
|
||||
is_boolean(to_followers?) do
|
||||
to = Object.listify(to)
|
||||
{activity_id, now_ms} = CryptoID.now!()
|
||||
object_id = CryptoID.encrypt_id(now_ms)
|
||||
|
||||
now = DateTime.now!("Etc/UTC") |> DateTime.to_iso8601()
|
||||
|
||||
object =
|
||||
%{
|
||||
"@id" => object_id,
|
||||
"@type" => "Note",
|
||||
"id" => object_id,
|
||||
"context" => object_id,
|
||||
"type" => "Note",
|
||||
"to" => to,
|
||||
"content" => content
|
||||
"content" => content,
|
||||
"attributedTo" => Util.my_id(),
|
||||
"published" => now
|
||||
}
|
||||
|> Object.add_context()
|
||||
|
||||
|
@ -126,17 +138,21 @@ defmodule Vonbraun.Control do
|
|||
object
|
||||
end
|
||||
|
||||
activity = Object.activity("Create", activity_id, object, to: to)
|
||||
object =
|
||||
if to_followers? do
|
||||
Object.mark_to_followers(object)
|
||||
else
|
||||
object
|
||||
end
|
||||
|
||||
activity = Object.activity("Create", activity_id, object)
|
||||
activity = Object.copy_recipients(activity, object)
|
||||
Logger.debug("Here is the raw activity:\n#{Jason.encode!(activity)}")
|
||||
|
||||
expanded_activity =
|
||||
JSON.LD.expand(activity,
|
||||
compact_arrays: false,
|
||||
document_loader: Vonbraun.JSONLD.DocumentLoaderAgent
|
||||
)
|
||||
Deliverator.send(activity)
|
||||
end
|
||||
|
||||
Logger.debug("Here is the expanded activity:\n#{Jason.encode!(expanded_activity)}")
|
||||
|
||||
activity
|
||||
def refresh_actor(id) when is_binary(id) do
|
||||
ActivityPubReq.get_cached_actor(id)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -14,6 +14,8 @@ defmodule Vonbraun.Ecto.Schema.Actor do
|
|||
field(:follows_me_ts, :naive_datetime)
|
||||
field(:following_state, :string)
|
||||
field(:following_ts, :naive_datetime)
|
||||
field(:shared_inbox, :string)
|
||||
field(:inbox, :string)
|
||||
timestamps()
|
||||
end
|
||||
|
||||
|
@ -23,7 +25,9 @@ defmodule Vonbraun.Ecto.Schema.Actor do
|
|||
follows_me_state: String.t() | nil,
|
||||
follows_me_ts: DateTime.t() | nil,
|
||||
following_state: String.t() | nil,
|
||||
following_ts: String.t() | nil
|
||||
following_ts: String.t() | nil,
|
||||
shared_inbox: String.t() | nil,
|
||||
inbox: String.t() | nil
|
||||
}
|
||||
|
||||
def changeset(struct, params \\ %{}) do
|
||||
|
@ -34,7 +38,9 @@ defmodule Vonbraun.Ecto.Schema.Actor do
|
|||
:follows_me_state,
|
||||
:follows_me_ts,
|
||||
:following_state,
|
||||
:following_ts
|
||||
:following_ts,
|
||||
:shared_inbox,
|
||||
:inbox
|
||||
])
|
||||
end
|
||||
|
||||
|
@ -49,6 +55,19 @@ defmodule Vonbraun.Ecto.Schema.Actor do
|
|||
end
|
||||
end
|
||||
|
||||
@spec set_inboxes(Vonbraun.Ecto.Schema.Actor.t(), nil | binary(), nil | binary()) ::
|
||||
{:ok, __MODULE__.t()} | {:error, any()}
|
||||
def set_inboxes(actor = %__MODULE__{}, shared_inbox, inbox)
|
||||
when (is_binary(shared_inbox) or is_nil(shared_inbox)) and
|
||||
(is_binary(inbox) or is_nil(inbox)) do
|
||||
actor
|
||||
|> changeset(%{shared_inbox: shared_inbox, inbox: inbox, updated_at: DateTime.now!("Etc/UTC")})
|
||||
|> Repo.update()
|
||||
end
|
||||
|
||||
def has_inboxes?(%__MODULE__{:shared_inbox => shared_inbox, :inbox => inbox}),
|
||||
do: shared_inbox != nil || inbox != nil
|
||||
|
||||
@doc """
|
||||
Mark a remote actor as following me, or pending follow, depending on if
|
||||
approval of followers is enabled. Tries to intelligently ignore repeat and
|
||||
|
@ -226,4 +245,36 @@ defmodule Vonbraun.Ecto.Schema.Actor do
|
|||
|
||||
Repo.all(query)
|
||||
end
|
||||
|
||||
@spec get_my_followers_inboxes() :: list(binary())
|
||||
def get_my_followers_inboxes() do
|
||||
query =
|
||||
from(u in __MODULE__,
|
||||
where:
|
||||
u.follows_me_state == "accepted" and (not is_nil(u.inbox) or not is_nil(u.shared_inbox)),
|
||||
select: {u.shared_inbox, u.inbox}
|
||||
)
|
||||
|
||||
Repo.all(query)
|
||||
|> Enum.reduce(MapSet.new(), fn {shared_inbox, inbox}, set ->
|
||||
box = shared_inbox || inbox
|
||||
MapSet.put(set, box)
|
||||
end)
|
||||
|> MapSet.to_list()
|
||||
end
|
||||
|
||||
def get_recipient_inboxes(recipients) when is_list(recipients) do
|
||||
query =
|
||||
from(u in __MODULE__,
|
||||
where: u.id in ^recipients and (not is_nil(u.inbox) or not is_nil(u.shared_inbox)),
|
||||
select: {u.shared_inbox, u.inbox}
|
||||
)
|
||||
|
||||
Repo.all(query)
|
||||
|> Enum.reduce(MapSet.new(), fn {shared_inbox, inbox}, set ->
|
||||
box = shared_inbox || inbox
|
||||
MapSet.put(set, box)
|
||||
end)
|
||||
|> MapSet.to_list()
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
defmodule Vonbraun.Repo.Migrations.Inboxes do
|
||||
use Ecto.Migration
|
||||
|
||||
def change do
|
||||
alter table(:actors) do
|
||||
add(:shared_inbox, :string)
|
||||
add(:inbox, :string)
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue