honkoma/lib/pleroma/web/streamer.ex

243 lines
6.8 KiB
Elixir
Raw Normal View History

# Pleroma: A lightweight social networking server
2018-12-31 15:41:47 +00:00
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
2017-11-11 13:59:25 +00:00
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
2019-02-09 15:16:26 +00:00
alias Pleroma.User
alias Pleroma.Notification
alias Pleroma.Activity
alias Pleroma.Object
alias Pleroma.Repo
2019-02-22 12:29:52 +00:00
alias Pleroma.Web.ActivityPub.Visibility
2017-11-11 13:59:25 +00:00
2018-12-17 13:39:59 +00:00
@keepalive_interval :timer.seconds(30)
2018-05-07 16:11:37 +00:00
2017-11-11 13:59:25 +00:00
def start_link do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
2017-11-11 13:59:25 +00:00
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
2018-12-17 13:39:59 +00:00
def init(args) do
spawn(fn ->
# 30 seconds
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:ok, args}
end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
2018-03-30 13:01:53 +00:00
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
2018-03-30 13:01:53 +00:00
send(socket.transport_pid, {:text, ""})
end)
2018-03-30 13:01:53 +00:00
spawn(fn ->
2018-03-30 13:01:53 +00:00
# 30 seconds
2018-12-17 13:39:59 +00:00
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
2018-03-30 13:01:53 +00:00
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(topics, user_topic, item)
end)
{:noreply, topics}
end
2018-05-30 13:33:37 +00:00
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
2019-02-22 12:29:52 +00:00
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = Repo.get(User, list.user_id)
2019-02-22 12:29:52 +00:00
Visibility.visible_for_user?(item, owner)
end)
end
2018-05-30 13:33:37 +00:00
recipient_topics =
recipient_lists
2018-05-30 13:33:37 +00:00
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(topics, list_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
topic = "user:#{item.user_id}"
2018-03-30 13:01:53 +00:00
Enum.each(topics[topic] || [], fn socket ->
json =
%{
event: "notification",
payload:
Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(
socket.assigns["user"],
item
)
|> Jason.encode!()
}
|> Jason.encode!()
send(socket.transport_pid, {:text, json})
end)
2018-03-30 13:01:53 +00:00
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
2018-03-30 13:01:53 +00:00
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)
2018-03-30 13:01:53 +00:00
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
2017-11-11 13:59:25 +00:00
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
2017-11-11 13:59:25 +00:00
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
2017-11-11 13:59:25 +00:00
def handle_cast(m, state) do
2018-02-21 14:44:00 +00:00
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
2017-11-11 13:59:25 +00:00
{:noreply, state}
end
2017-11-19 01:22:07 +00:00
defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || []
parent = Object.normalize(item.data["object"])
unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
def push_to_socket(topics, topic, %Activity{id: id, data: %{"type" => "Delete"}}) do
Enum.each(topics[topic] || [], fn socket ->
send(
socket.transport_pid,
{:text, %{event: "delete", payload: to_string(id)} |> Jason.encode!()}
)
end)
end
2017-11-19 01:22:07 +00:00
def push_to_socket(topics, topic, item) do
2018-03-30 13:01:53 +00:00
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || []
unless item.actor in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
2017-11-19 01:22:07 +00:00
end)
end
2018-05-26 19:06:46 +00:00
defp internal_topic(topic, socket) when topic in ~w[user direct] do
"#{topic}:#{socket.assigns[:user].id}"
2017-11-19 01:22:07 +00:00
end
defp internal_topic(topic, _), do: topic
2017-11-11 13:59:25 +00:00
end