It is not allowed to use the Sec-WebSocket-Protocol header for arbitrary values. This was possible due to the raw websocket handling we were doing with Cowboy, but Phoenix.Socket.Transport does not allow this as the value of this header is compared against a static list of subprotocols. https://hexdocs.pm/phoenix/Phoenix.Endpoint.html#socket/3-websocket-configuration Additionally I cannot find anywhere that we depended on this behavior. Setting the Sec-WebSocket-Protocol header does not appear to be a part of PleromaFE.
486 lines
15 KiB
Elixir
486 lines
15 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Integration.MastodonWebsocketTest do
|
|
# Needs a streamer, needs to stay synchronous
|
|
use Pleroma.DataCase
|
|
|
|
import ExUnit.CaptureLog
|
|
import Pleroma.Factory
|
|
|
|
alias Pleroma.Integration.WebsocketClient
|
|
alias Pleroma.Web.CommonAPI
|
|
alias Pleroma.Web.OAuth
|
|
|
|
@moduletag needs_streamer: true, capture_log: true
|
|
|
|
@path Pleroma.Web.Endpoint.url()
|
|
|> URI.parse()
|
|
|> Map.put(:scheme, "ws")
|
|
|> Map.put(:path, "/api/v1/streaming")
|
|
|> URI.to_string()
|
|
|
|
def start_socket(qs \\ nil, headers \\ []) do
|
|
path =
|
|
case qs do
|
|
nil -> @path
|
|
qs -> @path <> qs
|
|
end
|
|
|
|
WebsocketClient.start_link(self(), path, headers)
|
|
end
|
|
|
|
defp decode_json(json) do
|
|
with {:ok, %{"event" => event, "payload" => payload_text}} <- Jason.decode(json),
|
|
{:ok, payload} <- Jason.decode(payload_text) do
|
|
{:ok, %{"event" => event, "payload" => payload}}
|
|
end
|
|
end
|
|
|
|
# Turns atom keys to strings
|
|
defp atom_key_to_string(json) do
|
|
json
|
|
|> Jason.encode!()
|
|
|> Jason.decode!()
|
|
end
|
|
|
|
test "refuses invalid requests" do
|
|
capture_log(fn ->
|
|
assert {:error, %WebSockex.RequestError{code: 404}} = start_socket("?stream=ncjdk")
|
|
Process.sleep(30)
|
|
end)
|
|
end
|
|
|
|
test "requires authentication and a valid token for protected streams" do
|
|
capture_log(fn ->
|
|
assert {:error, %WebSockex.RequestError{code: 401}} =
|
|
start_socket("?stream=user&access_token=aaaaaaaaaaaa")
|
|
|
|
assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
|
|
Process.sleep(30)
|
|
end)
|
|
end
|
|
|
|
test "allows unified stream" do
|
|
assert {:ok, _} = start_socket()
|
|
end
|
|
|
|
test "allows public streams without authentication" do
|
|
assert {:ok, _} = start_socket("?stream=public")
|
|
assert {:ok, _} = start_socket("?stream=public:local")
|
|
assert {:ok, _} = start_socket("?stream=public:remote&instance=lain.com")
|
|
assert {:ok, _} = start_socket("?stream=hashtag&tag=lain")
|
|
end
|
|
|
|
test "receives well formatted events" do
|
|
user = insert(:user)
|
|
{:ok, _} = start_socket("?stream=public")
|
|
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
assert {:ok, json} = Jason.decode(raw_json)
|
|
|
|
assert "update" == json["event"]
|
|
assert json["payload"]
|
|
assert {:ok, json} = Jason.decode(json["payload"])
|
|
|
|
view_json =
|
|
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|
|
|> atom_key_to_string()
|
|
|
|
assert json == view_json
|
|
end
|
|
|
|
describe "subscribing via WebSocket" do
|
|
test "can subscribe" do
|
|
user = insert(:user)
|
|
{:ok, pid} = start_socket()
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
{:ok, activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
assert {:ok, json} = Jason.decode(raw_json)
|
|
|
|
assert "update" == json["event"]
|
|
assert json["payload"]
|
|
assert {:ok, json} = Jason.decode(json["payload"])
|
|
|
|
view_json =
|
|
Pleroma.Web.MastodonAPI.StatusView.render("show.json", activity: activity, for: nil)
|
|
|> Jason.encode!()
|
|
|> Jason.decode!()
|
|
|
|
assert json == view_json
|
|
end
|
|
|
|
test "can subscribe to multiple streams" do
|
|
user = insert(:user)
|
|
{:ok, pid} = start_socket()
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "subscribe", stream: "hashtag", tag: "mew"} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber #mew"})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
assert {:ok, %{"stream" => stream1}} = Jason.decode(raw_json)
|
|
assert_receive {:text, raw_json}, 1_000
|
|
assert {:ok, %{"stream" => stream2}} = Jason.decode(raw_json)
|
|
|
|
streams = [stream1, stream2]
|
|
assert ["hashtag", "mew"] in streams
|
|
assert ["public"] in streams
|
|
end
|
|
|
|
test "won't double subscribe" do
|
|
user = insert(:user)
|
|
{:ok, pid} = start_socket()
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "ignored"}
|
|
}} = decode_json(raw_json)
|
|
|
|
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
|
|
|
assert_receive {:text, _}, 1_000
|
|
refute_receive {:text, _}, 1_000
|
|
end
|
|
|
|
test "rejects invalid streams" do
|
|
{:ok, pid} = start_socket()
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "nonsense"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "error", "error" => "bad_topic"}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "can unsubscribe" do
|
|
user = insert(:user)
|
|
{:ok, pid} = start_socket()
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(pid, %{type: "unsubscribe", stream: "public"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "unsubscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
{:ok, _activity} = CommonAPI.post(user, %{status: "nice echo chamber"})
|
|
refute_receive {:text, _}, 1_000
|
|
end
|
|
end
|
|
|
|
describe "with a valid user token" do
|
|
setup do
|
|
{:ok, app} =
|
|
Pleroma.Repo.insert(
|
|
OAuth.App.register_changeset(%OAuth.App{}, %{
|
|
client_name: "client",
|
|
scopes: ["read"],
|
|
redirect_uris: "url"
|
|
})
|
|
)
|
|
|
|
user = insert(:user)
|
|
|
|
{:ok, auth} = OAuth.Authorization.create_authorization(app, user)
|
|
|
|
{:ok, token} = OAuth.Token.exchange_token(app, auth)
|
|
|
|
%{app: app, user: user, token: token}
|
|
end
|
|
|
|
test "accepts valid tokens", state do
|
|
assert {:ok, _} = start_socket("?stream=user&access_token=#{state.token.token}")
|
|
end
|
|
|
|
test "accepts the 'user' stream", %{token: token} = _state do
|
|
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
|
|
|
|
capture_log(fn ->
|
|
assert {:error, %WebSockex.RequestError{code: 401}} = start_socket("?stream=user")
|
|
Process.sleep(30)
|
|
end)
|
|
end
|
|
|
|
test "accepts the 'user:notification' stream", %{token: token} = _state do
|
|
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
|
|
|
|
capture_log(fn ->
|
|
assert {:error, %WebSockex.RequestError{code: 401}} =
|
|
start_socket("?stream=user:notification")
|
|
|
|
Process.sleep(30)
|
|
end)
|
|
end
|
|
|
|
test "accepts valid token on client-sent event", %{token: token} do
|
|
assert {:ok, pid} = start_socket()
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(pid, %{type: "subscribe", stream: "user"} |> Jason.encode!())
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "rejects invalid token on client-sent event" do
|
|
assert {:ok, pid} = start_socket()
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{
|
|
"type" => "pleroma:authenticate",
|
|
"result" => "error",
|
|
"error" => "unauthorized"
|
|
}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "rejects new authenticate request if already logged-in", %{token: token} do
|
|
assert {:ok, pid} = start_socket()
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "pleroma:authenticate", token: token.token} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "pleroma:authenticate", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "pleroma:authenticate", token: "Something else"} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{
|
|
"type" => "pleroma:authenticate",
|
|
"result" => "error",
|
|
"error" => "already_authenticated"
|
|
}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "accepts the 'list' stream", %{token: token, user: user} do
|
|
posting_user = insert(:user)
|
|
|
|
{:ok, list} = Pleroma.List.create("test", user)
|
|
Pleroma.List.follow(list, posting_user)
|
|
|
|
assert {:ok, _} = start_socket("?stream=list&access_token=#{token.token}&list=#{list.id}")
|
|
|
|
assert {:ok, pid} = start_socket("?access_token=#{token.token}")
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "subscribe", stream: "list", list: list.id} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "success"}
|
|
}} = decode_json(raw_json)
|
|
|
|
WebsocketClient.send_text(
|
|
pid,
|
|
%{type: "subscribe", stream: "list", list: to_string(list.id)} |> Jason.encode!()
|
|
)
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "pleroma:respond",
|
|
"payload" => %{"type" => "subscribe", "result" => "ignored"}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "disconnect when token is revoked", %{app: app, user: user, token: token} do
|
|
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
|
|
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
|
|
|
|
{:ok, auth} = OAuth.Authorization.create_authorization(app, user)
|
|
|
|
{:ok, token2} = OAuth.Token.exchange_token(app, auth)
|
|
assert {:ok, _} = start_socket("?stream=user&access_token=#{token2.token}")
|
|
|
|
OAuth.Token.Strategy.Revoke.revoke(token)
|
|
|
|
assert_receive {:close, _}
|
|
assert_receive {:close, _}
|
|
refute_receive {:close, _}
|
|
end
|
|
|
|
test "receives private statuses", %{user: reading_user, token: token} do
|
|
user = insert(:user)
|
|
CommonAPI.follow(reading_user, user)
|
|
|
|
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
|
|
|
|
{:ok, activity} =
|
|
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
assert {:ok, json} = Jason.decode(raw_json)
|
|
|
|
assert "update" == json["event"]
|
|
assert json["payload"]
|
|
assert {:ok, json} = Jason.decode(json["payload"])
|
|
|
|
view_json =
|
|
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
|
|
activity: activity,
|
|
for: reading_user
|
|
)
|
|
|> Jason.encode!()
|
|
|> Jason.decode!()
|
|
|
|
assert json == view_json
|
|
end
|
|
|
|
test "receives edits", %{user: reading_user, token: token} do
|
|
user = insert(:user)
|
|
CommonAPI.follow(reading_user, user)
|
|
|
|
{:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
|
|
|
|
{:ok, activity} =
|
|
CommonAPI.post(user, %{status: "nice echo chamber", visibility: "private"})
|
|
|
|
assert_receive {:text, _raw_json}, 1_000
|
|
|
|
{:ok, _} = CommonAPI.update(user, activity, %{status: "mew mew", visibility: "private"})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
activity = Pleroma.Activity.normalize(activity)
|
|
|
|
view_json =
|
|
Pleroma.Web.MastodonAPI.StatusView.render("show.json",
|
|
activity: activity,
|
|
for: reading_user
|
|
)
|
|
|> Jason.encode!()
|
|
|> Jason.decode!()
|
|
|
|
assert {:ok, %{"event" => "status.update", "payload" => ^view_json}} = decode_json(raw_json)
|
|
end
|
|
|
|
test "receives notifications", %{user: reading_user, token: token} do
|
|
user = insert(:user)
|
|
CommonAPI.follow(reading_user, user)
|
|
|
|
{:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
|
|
|
|
{:ok, %Pleroma.Activity{id: activity_id} = _activity} =
|
|
CommonAPI.post(user, %{
|
|
status: "nice echo chamber @#{reading_user.nickname}",
|
|
visibility: "private"
|
|
})
|
|
|
|
assert_receive {:text, raw_json}, 1_000
|
|
|
|
assert {:ok,
|
|
%{
|
|
"event" => "notification",
|
|
"payload" => %{
|
|
"status" => %{
|
|
"id" => ^activity_id
|
|
}
|
|
}
|
|
}} = decode_json(raw_json)
|
|
end
|
|
end
|
|
end
|