SideEffects: Move streaming of chats to after the transaction.
This commit is contained in:
parent
1a11f0e453
commit
2cdaac4330
|
@ -37,7 +37,7 @@ def handle(%{data: %{"type" => "Like"}} = object, meta) do
|
||||||
# - Rollback if we couldn't create it
|
# - Rollback if we couldn't create it
|
||||||
# - Set up notifications
|
# - Set up notifications
|
||||||
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
||||||
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
|
with {:ok, _object, meta} <- handle_object_creation(meta[:object_data], meta) do
|
||||||
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
|
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
|
||||||
|
|
||||||
meta =
|
meta =
|
||||||
|
@ -142,24 +142,24 @@ def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
|
||||||
actor = User.get_cached_by_ap_id(object.data["actor"])
|
actor = User.get_cached_by_ap_id(object.data["actor"])
|
||||||
recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
|
recipient = User.get_cached_by_ap_id(hd(object.data["to"]))
|
||||||
|
|
||||||
|
streamables =
|
||||||
[[actor, recipient], [recipient, actor]]
|
[[actor, recipient], [recipient, actor]]
|
||||||
|> Enum.each(fn [user, other_user] ->
|
|> Enum.map(fn [user, other_user] ->
|
||||||
if user.local do
|
if user.local do
|
||||||
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
|
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
|
||||||
{:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
|
{:ok, cm_ref} = MessageReference.create(chat, object, user.ap_id != actor.ap_id)
|
||||||
|
|
||||||
# We add a cache of the unread value here so that it
|
{
|
||||||
# doesn't change when being streamed out
|
|
||||||
chat =
|
|
||||||
chat
|
|
||||||
|> Map.put(:unread, MessageReference.unread_count_for_chat(chat))
|
|
||||||
|
|
||||||
Streamer.stream(
|
|
||||||
["user", "user:pleroma_chat"],
|
["user", "user:pleroma_chat"],
|
||||||
{user, %{cm_ref | chat: chat, object: object}}
|
{user, %{cm_ref | chat: chat, object: object}}
|
||||||
)
|
}
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|> Enum.filter(& &1)
|
||||||
|
|
||||||
|
meta =
|
||||||
|
meta
|
||||||
|
|> add_streamables(streamables)
|
||||||
|
|
||||||
{:ok, object, meta}
|
{:ok, object, meta}
|
||||||
end
|
end
|
||||||
|
@ -208,7 +208,7 @@ def handle_undoing(
|
||||||
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
|
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
|
||||||
|
|
||||||
defp send_notifications(meta) do
|
defp send_notifications(meta) do
|
||||||
Keyword.get(meta, :created_notifications, [])
|
Keyword.get(meta, :notifications, [])
|
||||||
|> Enum.each(fn notification ->
|
|> Enum.each(fn notification ->
|
||||||
Streamer.stream(["user", "user:notification"], notification)
|
Streamer.stream(["user", "user:notification"], notification)
|
||||||
Push.send(notification)
|
Push.send(notification)
|
||||||
|
@ -217,15 +217,32 @@ defp send_notifications(meta) do
|
||||||
meta
|
meta
|
||||||
end
|
end
|
||||||
|
|
||||||
defp add_notifications(meta, notifications) do
|
defp send_streamables(meta) do
|
||||||
existing = Keyword.get(meta, :created_notifications, [])
|
Keyword.get(meta, :streamables, [])
|
||||||
|
|> Enum.each(fn {topics, items} ->
|
||||||
|
Streamer.stream(topics, items)
|
||||||
|
end)
|
||||||
|
|
||||||
meta
|
meta
|
||||||
|> Keyword.put(:created_notifications, notifications ++ existing)
|
end
|
||||||
|
|
||||||
|
defp add_streamables(meta, streamables) do
|
||||||
|
existing = Keyword.get(meta, :streamables, [])
|
||||||
|
|
||||||
|
meta
|
||||||
|
|> Keyword.put(:streamables, streamables ++ existing)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp add_notifications(meta, notifications) do
|
||||||
|
existing = Keyword.get(meta, :notifications, [])
|
||||||
|
|
||||||
|
meta
|
||||||
|
|> Keyword.put(:notifications, notifications ++ existing)
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_after_transaction(meta) do
|
def handle_after_transaction(meta) do
|
||||||
meta
|
meta
|
||||||
|> send_notifications()
|
|> send_notifications()
|
||||||
|
|> send_streamables()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,13 +14,12 @@ defmodule Pleroma.Web.PleromaAPI.ChatView do
|
||||||
|
|
||||||
def render("show.json", %{chat: %Chat{} = chat} = opts) do
|
def render("show.json", %{chat: %Chat{} = chat} = opts) do
|
||||||
recipient = User.get_cached_by_ap_id(chat.recipient)
|
recipient = User.get_cached_by_ap_id(chat.recipient)
|
||||||
|
|
||||||
last_message = opts[:last_message] || MessageReference.last_message_for_chat(chat)
|
last_message = opts[:last_message] || MessageReference.last_message_for_chat(chat)
|
||||||
|
|
||||||
%{
|
%{
|
||||||
id: chat.id |> to_string(),
|
id: chat.id |> to_string(),
|
||||||
account: AccountView.render("show.json", Map.put(opts, :user, recipient)),
|
account: AccountView.render("show.json", Map.put(opts, :user, recipient)),
|
||||||
unread: Map.get(chat, :unread) || MessageReference.unread_count_for_chat(chat),
|
unread: MessageReference.unread_count_for_chat(chat),
|
||||||
last_message:
|
last_message:
|
||||||
last_message &&
|
last_message &&
|
||||||
MessageReferenceView.render("show.json", chat_message_reference: last_message),
|
MessageReferenceView.render("show.json", chat_message_reference: last_message),
|
||||||
|
|
|
@ -23,7 +23,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
|
||||||
import Mock
|
import Mock
|
||||||
|
|
||||||
describe "handle_after_transaction" do
|
describe "handle_after_transaction" do
|
||||||
test "it streams out notifications" do
|
test "it streams out notifications and streams" do
|
||||||
author = insert(:user, local: true)
|
author = insert(:user, local: true)
|
||||||
recipient = insert(:user, local: true)
|
recipient = insert(:user, local: true)
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ test "it streams out notifications" do
|
||||||
{:ok, _create_activity, meta} =
|
{:ok, _create_activity, meta} =
|
||||||
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
assert [notification] = meta[:created_notifications]
|
assert [notification] = meta[:notifications]
|
||||||
|
|
||||||
with_mocks([
|
with_mocks([
|
||||||
{
|
{
|
||||||
|
@ -58,6 +58,7 @@ test "it streams out notifications" do
|
||||||
SideEffects.handle_after_transaction(meta)
|
SideEffects.handle_after_transaction(meta)
|
||||||
|
|
||||||
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
|
||||||
assert called(Pleroma.Web.Push.send(notification))
|
assert called(Pleroma.Web.Push.send(notification))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -362,33 +363,10 @@ test "it streams the created ChatMessage" do
|
||||||
|
|
||||||
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
|
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
|
||||||
|
|
||||||
with_mock Pleroma.Web.Streamer, [],
|
{:ok, _create_activity, meta} =
|
||||||
stream: fn _, payload ->
|
|
||||||
case payload do
|
|
||||||
{^author, cm_ref} ->
|
|
||||||
assert cm_ref.unread == false
|
|
||||||
|
|
||||||
{^recipient, cm_ref} ->
|
|
||||||
assert cm_ref.unread == true
|
|
||||||
|
|
||||||
view =
|
|
||||||
Pleroma.Web.PleromaAPI.ChatView.render("show.json",
|
|
||||||
last_message: cm_ref,
|
|
||||||
chat: cm_ref.chat
|
|
||||||
)
|
|
||||||
|
|
||||||
assert view.unread == 1
|
|
||||||
|
|
||||||
_ ->
|
|
||||||
nil
|
|
||||||
end
|
|
||||||
end do
|
|
||||||
{:ok, _create_activity, _meta} =
|
|
||||||
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_}))
|
assert [_, _] = meta[:streamables]
|
||||||
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_}))
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it creates a Chat and MessageReferences for the local users and bumps the unread count, except for the author" do
|
test "it creates a Chat and MessageReferences for the local users and bumps the unread count, except for the author" do
|
||||||
|
@ -422,13 +400,18 @@ test "it creates a Chat and MessageReferences for the local users and bumps the
|
||||||
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
# The notification gets created
|
# The notification gets created
|
||||||
assert [notification] = meta[:created_notifications]
|
assert [notification] = meta[:notifications]
|
||||||
assert notification.activity_id == create_activity.id
|
assert notification.activity_id == create_activity.id
|
||||||
|
|
||||||
# But it is not sent out
|
# But it is not sent out
|
||||||
refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
refute called(Pleroma.Web.Push.send(notification))
|
refute called(Pleroma.Web.Push.send(notification))
|
||||||
|
|
||||||
|
# Same for the user chat stream
|
||||||
|
assert [{topics, _}, _] = meta[:streamables]
|
||||||
|
assert topics == ["user", "user:pleroma_chat"]
|
||||||
|
refute called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
|
||||||
|
|
||||||
chat = Chat.get(author.id, recipient.ap_id)
|
chat = Chat.get(author.id, recipient.ap_id)
|
||||||
|
|
||||||
[cm_ref] = MessageReference.for_chat_query(chat) |> Repo.all()
|
[cm_ref] = MessageReference.for_chat_query(chat) |> Repo.all()
|
||||||
|
|
|
@ -72,6 +72,7 @@ test "it posts a chat message without content but with an attachment" do
|
||||||
|
|
||||||
assert called(Pleroma.Web.Push.send(notification))
|
assert called(Pleroma.Web.Push.send(notification))
|
||||||
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], :_))
|
||||||
|
|
||||||
assert activity
|
assert activity
|
||||||
end
|
end
|
||||||
|
|
|
@ -16,21 +16,6 @@ defmodule Pleroma.Web.PleromaAPI.ChatViewTest do
|
||||||
|
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
|
|
||||||
test "giving a chat with an 'unread' field, it uses that" do
|
|
||||||
user = insert(:user)
|
|
||||||
recipient = insert(:user)
|
|
||||||
|
|
||||||
{:ok, chat} = Chat.get_or_create(user.id, recipient.ap_id)
|
|
||||||
|
|
||||||
chat =
|
|
||||||
chat
|
|
||||||
|> Map.put(:unread, 5)
|
|
||||||
|
|
||||||
represented_chat = ChatView.render("show.json", chat: chat)
|
|
||||||
|
|
||||||
assert represented_chat[:unread] == 5
|
|
||||||
end
|
|
||||||
|
|
||||||
test "it represents a chat" do
|
test "it represents a chat" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
recipient = insert(:user)
|
recipient = insert(:user)
|
||||||
|
|
Loading…
Reference in New Issue