MastoAPI: Websocket streaming for federated timeline.

This commit is contained in:
Roger Braun 2017-11-11 20:00:11 +01:00
parent bd5bdc4c24
commit 414c52509b
2 changed files with 44 additions and 6 deletions

View File

@ -1,12 +1,18 @@
defmodule Pleroma.Web.MastodonAPI.MastodonSocket do defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
use Phoenix.Socket use Phoenix.Socket
transport :streaming, Phoenix.Transports.WebSocket.Raw transport :streaming, Phoenix.Transports.WebSocket.Raw,
timeout: :infinity # We never receive data.
def connect(params, socket) do def connect(params, socket) do
IO.inspect(params) if params["stream"] == "public" do
Pleroma.Web.Streamer.add_socket(params["stream"], socket) socket = socket
{:ok, socket} |> assign(:topic, params["stream"])
Pleroma.Web.Streamer.add_socket(params["stream"], socket)
{:ok, socket}
else
:error
end
end end
def id(socket), do: nil def id(socket), do: nil
@ -21,7 +27,8 @@ def handle(:text, message, state) do
{:text, message} {:text, message}
end end
def handle(:closed, reason, _state) do def handle(:closed, reason, %{socket: socket}) do
IO.inspect reason topic = socket.assigns[:topic]
Pleroma.Web.Streamer.remove_socket(topic, socket)
end end
end end

View File

@ -4,6 +4,10 @@ defmodule Pleroma.Web.Streamer do
import Plug.Conn import Plug.Conn
def start_link do def start_link do
spawn(fn ->
Process.sleep(1000 * 30) # 30 seconds
GenServer.cast(__MODULE__, %{action: :ping})
end)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__) GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end end
@ -11,10 +15,28 @@ def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def stream(topic, item) do def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end end
def handle_cast(%{action: :ping}, topics) do
Map.values(topics)
|> List.flatten
|> Enum.each(fn (socket) ->
Logger.debug("Sending keepalive ping")
send socket.transport_pid, {:text, ""}
end)
spawn(fn ->
Process.sleep(1000 * 30) # 30 seconds
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}") Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}") Logger.debug("Pushing item to #{topic}")
@ -38,6 +60,15 @@ def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
{:noreply, sockets} {:noreply, sockets}
end end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
IO.inspect(sockets)
{:noreply, sockets}
end
def handle_cast(m, state) do def handle_cast(m, state) do
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state} {:noreply, state}