Refactor streamer test

This commit is contained in:
Tusooa Zhu 2022-08-19 19:58:16 -04:00
parent a31d6bb52c
commit 5a2c8ef4cc
No known key found for this signature in database
GPG Key ID: 7B467EDE43A08224
1 changed files with 64 additions and 17 deletions

View File

@ -815,7 +815,47 @@ test "it sends conversation update to the 'direct' stream when a message is dele
end end
describe "stop streaming if token got revoked" do describe "stop streaming if token got revoked" do
test "do not revoke other tokens" do setup do
child_proc = fn start, finalize ->
fn ->
start.()
receive do
{StreamerTest, :ready} ->
assert_receive {:render_with_user, _, "update.json", _}
receive do
{StreamerTest, :revoked} -> finalize.()
end
end
end
end
starter = fn user, token ->
fn -> Streamer.get_topic_and_add_socket("user", user, token) end
end
hit = fn -> assert_receive :close end
miss = fn -> refute_receive :close end
send_all = fn tasks, thing -> Enum.each(tasks, &send(&1.pid, thing)) end
%{
child_proc: child_proc,
starter: starter,
hit: hit,
miss: miss,
send_all: send_all
}
end
test "do not revoke other tokens", %{
child_proc: child_proc,
starter: starter,
hit: hit,
miss: miss,
send_all: send_all
} do
%{user: user, token: token} = oauth_access(["read"]) %{user: user, token: token} = oauth_access(["read"])
%{token: token2} = oauth_access(["read"], user: user) %{token: token2} = oauth_access(["read"], user: user)
%{user: user2, token: user2_token} = oauth_access(["read"]) %{user: user2, token: user2_token} = oauth_access(["read"])
@ -824,47 +864,54 @@ test "do not revoke other tokens" do
CommonAPI.follow(user, post_user) CommonAPI.follow(user, post_user)
CommonAPI.follow(user2, post_user) CommonAPI.follow(user2, post_user)
Streamer.get_topic_and_add_socket("user", user, token) tasks = [
Streamer.get_topic_and_add_socket("user", user, token2) Task.async(child_proc.(starter.(user, token), hit)),
Streamer.get_topic_and_add_socket("user", user2, user2_token) Task.async(child_proc.(starter.(user, token2), miss)),
Task.async(child_proc.(starter.(user2, user2_token), miss))
]
{:ok, _} = {:ok, _} =
CommonAPI.post(post_user, %{ CommonAPI.post(post_user, %{
status: "hi" status: "hi"
}) })
assert_receive {:render_with_user, _, "update.json", _} send_all.(tasks, {StreamerTest, :ready})
assert_receive {:render_with_user, _, "update.json", _}
assert_receive {:render_with_user, _, "update.json", _}
Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
assert_receive :close send_all.(tasks, {StreamerTest, :revoked})
refute_receive :close
Enum.each(tasks, &Task.await/1)
end end
test "revoke all streams for this token" do test "revoke all streams for this token", %{
child_proc: child_proc,
starter: starter,
hit: hit,
send_all: send_all
} do
%{user: user, token: token} = oauth_access(["read"]) %{user: user, token: token} = oauth_access(["read"])
post_user = insert(:user) post_user = insert(:user)
CommonAPI.follow(user, post_user) CommonAPI.follow(user, post_user)
Streamer.get_topic_and_add_socket("user", user, token) tasks = [
Streamer.get_topic_and_add_socket("user", user, token) Task.async(child_proc.(starter.(user, token), hit)),
Task.async(child_proc.(starter.(user, token), hit))
]
{:ok, _} = {:ok, _} =
CommonAPI.post(post_user, %{ CommonAPI.post(post_user, %{
status: "hi" status: "hi"
}) })
assert_receive {:render_with_user, _, "update.json", _} send_all.(tasks, {StreamerTest, :ready})
assert_receive {:render_with_user, _, "update.json", _}
Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token) Pleroma.Web.OAuth.Token.Strategy.Revoke.revoke(token)
assert_receive :close send_all.(tasks, {StreamerTest, :revoked})
assert_receive :close
refute_receive :close Enum.each(tasks, &Task.await/1)
end end
end end
end end