From 273cda63ad79b61f4d37e4b7603694908e894e4f Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 31 Mar 2023 22:55:52 -0400 Subject: [PATCH] Allow subscribing to streams --- .../web/mastodon_api/websocket_handler.ex | 74 +++++++++++++++ lib/pleroma/web/views/streamer_view.ex | 18 ++++ .../integration/mastodon_websocket_test.exs | 90 +++++++++++++++++++ 3 files changed, 182 insertions(+) diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index 97a1f1401..a42a9a63c 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do alias Pleroma.User alias Pleroma.Web.OAuth.Token alias Pleroma.Web.Streamer + alias Pleroma.Web.StreamerView @behaviour :cowboy_websocket @@ -73,6 +74,16 @@ def websocket_handle(:pong, state) do # We only receive pings for now def websocket_handle(:ping, state), do: {:ok, state} + def websocket_handle({:text, text}, state) do + with {:ok, %{} = event} <- Jason.decode(text) do + handle_client_event(event, state) + else + _ -> + Logger.error("#{__MODULE__} received non-JSON event: #{inspect(text)}") + {:ok, state} + end + end + def websocket_handle(frame, state) do Logger.error("#{__MODULE__} received frame: #{inspect(frame)}") {:ok, state} @@ -144,4 +155,67 @@ defp authenticate_request(access_token, sec_websocket) do defp timer do Process.send_after(self(), :tick, @tick) end + + defp handle_client_event(%{"type" => "subscribe", "stream" => _topic} = params, state) do + with {_, {:ok, topic}} <- + {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)}, + {_, false} <- {:subscribed, topic in state.topics} do + Streamer.add_socket(topic, state.oauth_token) + + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "success"})} + ], %{state | topics: [topic | state.topics]}} + else + {:subscribed, true} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "subscribe", result: "ignored"})} + ], state} + + {:topic, {:error, error}} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "subscribe", + result: "error", + error: error + })} + ], state} + end + end + + defp handle_client_event(%{"type" => "unsubscribe", "stream" => _topic} = params, state) do + with {_, {:ok, topic}} <- + {:topic, Streamer.get_topic(params["stream"], state.user, state.oauth_token, params)}, + {_, true} <- {:subscribed, topic in state.topics} do + Streamer.remove_socket(topic) + + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "success"})} + ], %{state | topics: List.delete(state.topics, topic)}} + else + {:subscribed, false} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{type: "unsubscribe", result: "ignored"})} + ], state} + + {:topic, {:error, error}} -> + {[ + {:text, + StreamerView.render("pleroma_respond.json", %{ + type: "unsubscribe", + result: "error", + error: error + })} + ], state} + end + end + + defp handle_client_event(params, state) do + Logger.error("#{__MODULE__} received unknown event: #{inspect(params)}") + {[], state} + end end diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 6a55242b0..19f098783 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -135,4 +135,22 @@ def render("conversation.json", %Participation{} = participation) do } |> Jason.encode!() end + + def render("pleroma_respond.json", %{type: type, result: result} = params) do + %{ + event: "pleroma.respond", + payload: + %{ + result: result, + type: type + } + |> Map.merge(maybe_error(params)) + |> Jason.encode!() + } + |> Jason.encode!() + end + + defp maybe_error(%{error: :bad_topic}), do: %{error: "bad_topic"} + defp maybe_error(%{error: :unauthorized}), do: %{error: "unauthorized"} + defp maybe_error(_), do: %{} end diff --git a/test/pleroma/integration/mastodon_websocket_test.exs b/test/pleroma/integration/mastodon_websocket_test.exs index 3b120c10e..9db0f714f 100644 --- a/test/pleroma/integration/mastodon_websocket_test.exs +++ b/test/pleroma/integration/mastodon_websocket_test.exs @@ -31,6 +31,13 @@ def start_socket(qs \\ nil, headers \\ []) do WebsocketClient.start_link(self(), path, headers) end + defp decode_json(json) do + with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json), + {:ok, payload} <- Jason.decode(payload_text) do + {:ok, %{"event" => event, "payload" => payload}} + end + end + test "refuses invalid requests" do capture_log(fn -> assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk") @@ -79,6 +86,89 @@ test "receives well formatted events" do assert json == view_json end + describe "subscribing via WebSocket" do + test "can subscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + {:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + + assert_receive {:text, raw_json}, 1_000 + assert {:ok, json} = Jason.decode(raw_json) + + assert "update" == json["event"] + assert json["payload"] + assert {:ok, json} = Jason.decode(json["payload"]) + + view_json = + Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil) + |> Jason.encode!() + |> Jason.decode!() + + assert json == view_json + end + + test "won't double subscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "ignored"} + }} = decode_json(raw_json) + + {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + + assert_receive {:text, _}, 1_000 + refute_receive {:text, _}, 1_000 + end + + test "can unsubscribe" do + user = insert(:user) + {:ok, pid} = start_socket() + WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "subscribe", "result" => "success"} + }} = decode_json(raw_json) + + WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!()) + assert_receive {:text, raw_json}, 1_000 + + assert {:ok, + %{ + "event" => "pleroma.respond", + "payload" => %{"type" => "unsubscribe", "result" => "success"} + }} = decode_json(raw_json) + + {:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"}) + refute_receive {:text, _}, 1_000 + end + end + describe "with a valid user token" do setup do {:ok, app} =