spc-pleroma/lib/pleroma/web/activity_pub/activity_pub.ex

1861 lines
55 KiB
Elixir
Raw Normal View History

# Pleroma: A lightweight social networking server
2022-02-26 06:11:42 +00:00
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
2017-03-21 08:21:52 +00:00
defmodule Pleroma.Web.ActivityPub.ActivityPub do
2019-02-09 15:16:26 +00:00
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
2019-06-03 13:04:39 +00:00
alias Pleroma.Config
alias Pleroma.Constants
alias Pleroma.Conversation
alias Pleroma.Conversation.Participation
alias Pleroma.Filter
alias Pleroma.Hashtag
alias Pleroma.Maps
alias Pleroma.Notification
2019-02-09 15:16:26 +00:00
alias Pleroma.Object
alias Pleroma.Object.Containment
alias Pleroma.Object.Fetcher
alias Pleroma.Pagination
alias Pleroma.Repo
2019-02-09 15:16:26 +00:00
alias Pleroma.Upload
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
2021-07-18 01:35:35 +00:00
alias Pleroma.Workers.PollWorker
2019-02-06 19:20:02 +00:00
2017-03-21 16:53:20 +00:00
import Ecto.Query
2017-05-16 13:31:11 +00:00
import Pleroma.Web.ActivityPub.Utils
2019-02-22 12:29:52 +00:00
import Pleroma.Web.ActivityPub.Visibility
2019-02-06 19:20:02 +00:00
2017-05-07 18:16:07 +00:00
require Logger
require Pleroma.Constants
2017-03-21 08:21:52 +00:00
@behaviour Pleroma.Web.ActivityPub.ActivityPub.Persisting
@behaviour Pleroma.Web.ActivityPub.ActivityPub.Streaming
2019-01-18 19:40:52 +00:00
defp get_recipients(%{"type" => "Create"} = data) do
2019-05-01 09:11:17 +00:00
to = Map.get(data, "to", [])
cc = Map.get(data, "cc", [])
bcc = Map.get(data, "bcc", [])
actor = Map.get(data, "actor", [])
recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq()
2019-01-18 19:40:52 +00:00
{recipients, to, cc}
end
defp get_recipients(data) do
2019-05-01 09:11:17 +00:00
to = Map.get(data, "to", [])
cc = Map.get(data, "cc", [])
bcc = Map.get(data, "bcc", [])
recipients = Enum.concat([to, cc, bcc])
{recipients, to, cc}
end
defp check_actor_can_insert(%{"type" => "Delete"}), do: true
defp check_actor_can_insert(%{"type" => "Undo"}), do: true
defp check_actor_can_insert(%{"actor" => actor}) when is_binary(actor) do
case User.get_cached_by_ap_id(actor) do
%User{is_active: true} -> true
_ -> false
end
end
defp check_actor_can_insert(_), do: true
defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do
2019-06-03 13:04:39 +00:00
limit = Config.get([:instance, :remote_limit])
String.length(content) <= limit
end
defp check_remote_limit(_), do: true
def increase_note_count_if_public(actor, object) do
if public?(object), do: User.increase_note_count(actor), else: {:ok, actor}
end
def decrease_note_count_if_public(actor, object) do
if public?(object), do: User.decrease_note_count(actor), else: {:ok, actor}
end
2021-12-26 02:35:17 +00:00
def update_last_status_at_if_public(actor, object) do
if public?(object), do: User.update_last_status_at(actor), else: {:ok, actor}
2021-12-26 02:35:17 +00:00
end
defp increase_replies_count_if_reply(%{
"object" => %{"inReplyTo" => reply_ap_id} = object,
"type" => "Create"
}) do
if public?(object) do
Object.increase_replies_count(reply_ap_id)
end
end
defp increase_replies_count_if_reply(_create_data), do: :noop
2023-11-12 13:38:08 +00:00
defp increase_quotes_count_if_quote(%{
"object" => %{"quoteUrl" => quote_ap_id} = object,
"type" => "Create"
}) do
if public?(object) do
2023-11-12 13:38:08 +00:00
Object.increase_quotes_count(quote_ap_id)
end
end
defp increase_quotes_count_if_quote(_create_data), do: :noop
2020-11-16 16:13:42 +00:00
@object_types ~w[ChatMessage Question Answer Audio Video Image Event Article Note Page]
@impl true
def persist(%{"type" => type} = object, meta) when type in @object_types do
with {:ok, object} <- Object.create(object) do
{:ok, object, meta}
end
end
@impl true
def persist(object, meta) do
2020-03-26 14:16:54 +00:00
with local <- Keyword.fetch!(meta, :local),
{recipients, _, _} <- get_recipients(object),
{:ok, activity} <-
Repo.insert(%Activity{
data: object,
local: local,
recipients: recipients,
actor: object["actor"]
2020-09-03 14:56:20 +00:00
}),
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
{:ok, _} <- maybe_create_activity_expiration(activity) do
2020-03-26 14:16:54 +00:00
{:ok, activity, meta}
end
end
@spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do
with nil <- Activity.normalize(map),
map <- lazy_put_activity_defaults(map, fake),
{_, true} <- {:actor_check, bypass_actor_check || check_actor_can_insert(map)},
{_, true} <- {:remote_limit_pass, check_remote_limit(map)},
{:ok, map} <- MRF.filter(map),
2019-03-29 18:59:04 +00:00
{recipients, _, _} = get_recipients(map),
{:fake, false, map, recipients} <- {:fake, fake, map, recipients},
{:containment, :ok} <- {:containment, Containment.contain_child(map)},
{:ok, map, object} <- insert_full_object(map),
{:ok, activity} <- insert_activity_with_expiration(map, local, recipients) do
# Splice in the child object if we have one.
activity = Maps.put_if_present(activity, :object, object)
RichMedia refactor Rich Media parsing was previously handled on-demand with a 2 second HTTP request timeout and retained only in Cachex. Every time a Pleroma instance is restarted it will have to request and parse the data for each status with a URL detected. When fetching a batch of statuses they were processed in parallel to attempt to keep the maximum latency at 2 seconds, but often resulted in a timeline appearing to hang during loading due to a URL that could not be successfully reached. URLs which had images links that expire (Amazon AWS) were parsed and inserted with a TTL to ensure the image link would not break. Rich Media data is now cached in the database and fetched asynchronously. Cachex is used as a read-through cache. When the data becomes available we stream an update to the clients. If the result is returned quickly the experience is almost seamless. Activities were already processed for their Rich Media data during ingestion to warm the cache, so users should not normally encounter the asynchronous loading of the Rich Media data. Implementation notes: - The async worker is a Task with a globally unique process name to prevent duplicate processing of the same URL - The Task will attempt to fetch the data 3 times with increasing sleep time between attempts - The HTTP request obeys the default HTTP request timeout value instead of 2 seconds - URLs that cannot be successfully parsed due to an unexpected error receives a negative cache entry for 15 minutes - URLs that fail with an expected error will receive a negative cache with no TTL - Activities that have no detected URLs insert a nil value in the Cachex :scrubber_cache so we do not repeat parsing the object content with Floki every time the activity is rendered - Expiring image URLs are handled with an Oban job - There is no automatic cleanup of the Rich Media data in the database, but it is safe to delete at any time - The post draft/preview feature makes the URL processing synchronous so the rendered post preview will have an accurate rendering Overall performance of timelines and creating new posts which contain URLs is greatly improved.
2024-02-11 21:11:52 +00:00
Pleroma.Web.RichMedia.Card.get_by_activity(activity)
2021-08-22 13:37:52 +00:00
# Add local posts to search index
if local, do: Pleroma.Search.add_to_index(activity)
{:ok, activity}
2017-05-16 13:31:11 +00:00
else
2019-03-29 18:59:04 +00:00
%Activity{} = activity ->
{:ok, activity}
{:actor_check, _} ->
{:error, false}
{:containment, _} = error ->
error
{:error, _} = error ->
error
2019-03-29 18:59:04 +00:00
{:fake, true, map, recipients} ->
2019-04-01 08:55:59 +00:00
activity = %Activity{
data: map,
local: local,
actor: map["actor"],
recipients: recipients,
id: "pleroma:fakeid"
}
RichMedia refactor Rich Media parsing was previously handled on-demand with a 2 second HTTP request timeout and retained only in Cachex. Every time a Pleroma instance is restarted it will have to request and parse the data for each status with a URL detected. When fetching a batch of statuses they were processed in parallel to attempt to keep the maximum latency at 2 seconds, but often resulted in a timeline appearing to hang during loading due to a URL that could not be successfully reached. URLs which had images links that expire (Amazon AWS) were parsed and inserted with a TTL to ensure the image link would not break. Rich Media data is now cached in the database and fetched asynchronously. Cachex is used as a read-through cache. When the data becomes available we stream an update to the clients. If the result is returned quickly the experience is almost seamless. Activities were already processed for their Rich Media data during ingestion to warm the cache, so users should not normally encounter the asynchronous loading of the Rich Media data. Implementation notes: - The async worker is a Task with a globally unique process name to prevent duplicate processing of the same URL - The Task will attempt to fetch the data 3 times with increasing sleep time between attempts - The HTTP request obeys the default HTTP request timeout value instead of 2 seconds - URLs that cannot be successfully parsed due to an unexpected error receives a negative cache entry for 15 minutes - URLs that fail with an expected error will receive a negative cache with no TTL - Activities that have no detected URLs insert a nil value in the Cachex :scrubber_cache so we do not repeat parsing the object content with Floki every time the activity is rendered - Expiring image URLs are handled with an Oban job - There is no automatic cleanup of the Rich Media data in the database, but it is safe to delete at any time - The post draft/preview feature makes the URL processing synchronous so the rendered post preview will have an accurate rendering Overall performance of timelines and creating new posts which contain URLs is greatly improved.
2024-02-11 21:11:52 +00:00
Pleroma.Web.RichMedia.Card.get_by_activity(activity)
2019-04-01 08:55:59 +00:00
{:ok, activity}
2019-03-29 18:59:04 +00:00
{:remote_limit_pass, _} ->
{:error, :remote_limit}
{:reject, _} = e ->
{:error, e}
2017-05-07 18:13:10 +00:00
end
2017-03-21 08:21:52 +00:00
end
2017-03-21 16:53:20 +00:00
defp insert_activity_with_expiration(data, local, recipients) do
2020-09-03 14:56:20 +00:00
struct = %Activity{
data: data,
local: local,
actor: data["actor"],
recipients: recipients
}
2020-09-03 14:56:20 +00:00
with {:ok, activity} <- Repo.insert(struct) do
maybe_create_activity_expiration(activity)
2017-05-07 18:13:10 +00:00
end
2017-03-21 08:21:52 +00:00
end
2017-03-21 16:53:20 +00:00
2020-05-07 09:13:32 +00:00
def notify_and_stream(activity) do
{:ok, notifications} = Notification.create_notifications(activity)
Notification.send(notifications)
2020-05-07 09:13:32 +00:00
2022-06-04 01:15:17 +00:00
original_activity =
case activity do
%{data: %{"type" => "Update"}, object: %{data: %{"id" => id}}} ->
Activity.get_create_by_object_ap_id_with_object(id)
_ ->
activity
end
conversation = create_or_bump_conversation(original_activity, original_activity.actor)
2020-05-07 09:13:32 +00:00
participations = get_participations(conversation)
stream_out(activity)
stream_out_participations(participations)
end
2020-09-03 14:56:20 +00:00
defp maybe_create_activity_expiration(
%{data: %{"expires_at" => %DateTime{} = expires_at}} = activity
) do
2020-08-22 17:46:01 +00:00
with {:ok, _job} <-
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
activity_id: activity.id,
expires_at: expires_at
}) do
{:ok, activity}
end
end
2020-09-03 14:56:20 +00:00
defp maybe_create_activity_expiration(activity), do: {:ok, activity}
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
%User{} = user <- User.get_cached_by_ap_id(actor) do
Participation.mark_as_read(user, conversation)
{:ok, conversation}
end
end
defp get_participations({:ok, conversation}) do
conversation
|> Repo.preload(:participations, force: true)
|> Map.get(:participations)
end
defp get_participations(_), do: []
def stream_out_participations(participations) do
participations =
participations
|> Repo.preload(:user)
Streamer.stream("participation", participations)
end
@impl true
def stream_out_participations(%Object{data: %{"context" => context}}, user) do
with %Conversation{} = conversation <- Conversation.get_for_ap_id(context) do
conversation = Repo.preload(conversation, :participations)
last_activity_id =
fetch_latest_direct_activity_id_for_context(conversation.ap_id, %{
user: user,
blocking_user: user
})
if last_activity_id do
stream_out_participations(conversation.participations)
end
end
end
@impl true
def stream_out_participations(_, _), do: :noop
@impl true
def stream_out(%Activity{data: %{"type" => data_type}} = activity)
2022-06-04 01:15:17 +00:00
when data_type in ["Create", "Announce", "Delete", "Update"] do
activity
|> Topics.get_activity_topics()
|> Streamer.stream(activity)
end
@impl true
def stream_out(_activity) do
:noop
end
@spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
def create(params, fake \\ false) do
with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do
result
end
end
defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do
2018-02-15 18:59:03 +00:00
additional = params[:additional] || %{}
2018-03-30 13:01:53 +00:00
# only accept false as false value
local = !(params[:local] == false)
2018-02-15 18:59:03 +00:00
published = params[:published]
quick_insert? = Config.get([:env]) == :benchmark
2018-02-15 18:59:03 +00:00
create_data =
make_create_data(
%{to: to, actor: actor, published: published, context: context, object: object},
additional
)
with {:ok, activity} <- insert(create_data, local, fake),
2019-03-29 18:59:04 +00:00
{:fake, false, activity} <- {:fake, fake, activity},
_ <- increase_replies_count_if_reply(create_data),
2023-11-12 13:38:08 +00:00
_ <- increase_quotes_count_if_quote(create_data),
2019-09-04 17:18:11 +00:00
{:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
{:ok, _actor} <- increase_note_count_if_public(actor, activity),
2021-12-26 02:35:17 +00:00
{:ok, _actor} <- update_last_status_at_if_public(actor, activity),
2020-05-07 09:13:32 +00:00
_ <- notify_and_stream(activity),
2021-07-18 01:35:35 +00:00
:ok <- maybe_schedule_poll_notifications(activity),
2023-11-07 00:59:05 +00:00
:ok <- maybe_handle_group_posts(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
2019-03-29 18:59:04 +00:00
else
2019-09-04 17:18:11 +00:00
{:quick_insert, true, activity} ->
{:ok, activity}
2019-03-29 18:59:04 +00:00
{:fake, true, activity} ->
{:ok, activity}
{:error, message} ->
Repo.rollback(message)
end
2017-03-21 08:21:52 +00:00
end
2017-03-21 16:53:20 +00:00
2021-07-18 01:35:35 +00:00
defp maybe_schedule_poll_notifications(activity) do
PollWorker.schedule_poll_end(activity)
:ok
end
@spec listen(map()) :: {:ok, Activity.t()} | {:error, any()}
def listen(%{to: to, actor: actor, context: context, object: object} = params) do
additional = params[:additional] || %{}
# only accept false as false value
local = !(params[:local] == false)
published = params[:published]
listen_data =
make_listen_data(
%{to: to, actor: actor, published: published, context: context, object: object},
additional
)
with {:ok, activity} <- insert(listen_data, local),
2020-05-07 09:13:32 +00:00
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
@spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) ::
{:ok, Activity.t()} | nil | {:error, any()}
2018-05-21 01:01:14 +00:00
def unfollow(follower, followed, activity_id \\ nil, local \\ true) do
with {:ok, result} <-
Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do
result
end
end
defp do_unfollow(follower, followed, activity_id, local) do
2017-05-16 13:31:11 +00:00
with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed),
{:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
2018-05-21 01:01:14 +00:00
unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
2017-05-16 13:31:11 +00:00
{:ok, activity} <- insert(unfollow_data, local),
2020-05-07 09:13:32 +00:00
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
2017-05-16 13:31:11 +00:00
{:ok, activity}
else
nil -> nil
{:error, error} -> Repo.rollback(error)
end
end
@spec flag(map()) :: {:ok, Activity.t()} | {:error, any()}
2020-11-16 16:22:32 +00:00
def flag(params) do
with {:ok, result} <- Repo.transaction(fn -> do_flag(params) end) do
result
end
end
defp do_flag(
%{
actor: actor,
context: _context,
account: account,
statuses: statuses,
content: content
} = params
) do
2019-02-20 16:51:25 +00:00
# only accept false as false value
local = !(params[:local] == false)
forward = !(params[:forward] == false)
additional = params[:additional] || %{}
2019-02-20 16:51:25 +00:00
additional =
if forward do
Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]})
else
Map.merge(additional, %{"to" => [], "cc" => []})
end
with flag_data <- make_flag_data(params, additional),
{:ok, activity} <- insert(flag_data, local),
{:ok, stripped_activity} <- strip_report_status_data(activity),
2020-05-07 09:13:32 +00:00
_ <- notify_and_stream(activity),
2020-11-16 16:22:32 +00:00
:ok <-
maybe_federate(stripped_activity) do
User.all_users_with_privilege(:reports_manage_reports)
|> Enum.filter(fn user -> user.ap_id != actor end)
2020-02-27 13:27:49 +00:00
|> Enum.filter(fn user -> not is_nil(user.email) end)
|> Enum.each(fn privileged_user ->
privileged_user
|> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content)
|> Pleroma.Emails.Mailer.deliver_async()
end)
{:ok, activity}
2020-11-16 16:22:32 +00:00
else
{:error, error} -> Repo.rollback(error)
end
2019-02-20 16:51:25 +00:00
end
@spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()}
2019-10-30 11:21:49 +00:00
def move(%User{} = origin, %User{} = target, local \\ true) do
params = %{
"type" => "Move",
"actor" => origin.ap_id,
"object" => origin.ap_id,
"target" => target.ap_id,
"to" => [origin.follower_address]
2019-10-30 11:21:49 +00:00
}
with true <- origin.ap_id in target.also_known_as,
2020-05-07 09:13:32 +00:00
{:ok, activity} <- insert(params, local),
_ <- notify_and_stream(activity) do
2019-10-30 11:21:49 +00:00
maybe_federate(activity)
BackgroundWorker.enqueue("move_following", %{
"origin_id" => origin.id,
"target_id" => target.id
})
{:ok, activity}
else
false -> {:error, "Target account must have the origin in `alsoKnownAs`"}
err -> err
end
end
2020-03-24 16:18:27 +00:00
def fetch_activities_for_context_query(context, opts) do
public = [Constants.as_public()]
2018-02-18 19:52:07 +00:00
2018-03-30 13:01:53 +00:00
recipients =
if opts[:user],
do: [opts[:user].ap_id | User.following(opts[:user])] ++ public,
else: public
2018-03-30 13:01:53 +00:00
from(activity in Activity)
|> maybe_preload_objects(opts)
|> maybe_preload_bookmarks(opts)
|> maybe_set_thread_muted_field(opts)
|> restrict_unauthenticated(opts[:user])
|> restrict_blocked(opts)
|> restrict_blockers_visibility(opts)
|> restrict_recipients(recipients, opts[:user])
|> restrict_filtered(opts)
|> where(
[activity],
fragment(
"?->>'type' = ? and ?->>'context' = ?",
activity.data,
"Create",
activity.data,
^context
2018-03-30 13:01:53 +00:00
)
)
|> exclude_poll_votes(opts)
|> exclude_id(opts)
|> order_by([activity], desc: activity.id)
end
@spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()]
def fetch_activities_for_context(context, opts \\ %{}) do
context
|> fetch_activities_for_context_query(opts)
|> Repo.all()
end
2018-03-30 13:01:53 +00:00
@spec fetch_latest_direct_activity_id_for_context(String.t(), keyword() | map()) ::
Ecto.UUID.t() | nil
def fetch_latest_direct_activity_id_for_context(context, opts \\ %{}) do
context
|> fetch_activities_for_context_query(Map.merge(%{skip_preload: true}, opts))
|> restrict_visibility(%{visibility: "direct"})
|> limit(1)
|> select([a], a.id)
|> Repo.one()
2017-03-23 20:22:49 +00:00
end
defp fetch_paginated_optimized(query, opts, pagination) do
# Note: tag-filtering funcs may apply "ORDER BY objects.id DESC",
# and extra sorting on "activities.id DESC NULLS LAST" would worse the query plan
opts = Map.put(opts, :skip_extra_order, true)
Pagination.fetch_paginated(query, opts, pagination)
end
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
list_memberships = Pleroma.List.memberships(opts[:user])
fetch_activities_query(recipients ++ list_memberships, opts)
|> fetch_paginated_optimized(opts, pagination)
|> Enum.reverse()
|> maybe_update_cc(list_memberships, opts[:user])
end
@spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()]
def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do
includes_local_public = Map.get(opts, :includes_local_public, false)
opts = Map.delete(opts, :user)
2018-03-30 13:01:53 +00:00
intended_recipients =
if includes_local_public do
[Constants.as_public(), as_local_public()]
else
[Constants.as_public()]
end
intended_recipients
|> fetch_activities_query(opts)
|> restrict_unlisted(opts)
|> fetch_paginated_optimized(opts, pagination)
end
@spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()]
def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do
opts
|> Map.put(:restrict_unlisted, true)
|> fetch_public_or_unlisted_activities(pagination)
2017-03-22 13:45:17 +00:00
end
2018-05-11 02:17:33 +00:00
@valid_visibilities ~w[direct unlisted public private]
defp restrict_visibility(query, %{visibility: visibility})
when is_list(visibility) do
if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
from(
a in query,
where:
fragment(
"activity_visibility(?, ?, ?) = ANY (?)",
a.actor,
a.recipients,
a.data,
^visibility
)
)
else
Logger.error("Could not restrict visibility to #{visibility}")
end
end
2019-01-09 15:45:09 +00:00
defp restrict_visibility(query, %{visibility: visibility})
when visibility in @valid_visibilities do
from(
a in query,
where:
fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility)
)
2018-05-11 02:17:33 +00:00
end
defp restrict_visibility(_query, %{visibility: visibility})
when visibility not in @valid_visibilities do
Logger.error("Could not restrict visibility to #{visibility}")
end
defp restrict_visibility(query, _visibility), do: query
defp exclude_visibility(query, %{exclude_visibilities: visibility})
when is_list(visibility) do
if Enum.all?(visibility, &(&1 in @valid_visibilities)) do
from(
a in query,
where:
not fragment(
"activity_visibility(?, ?, ?) = ANY (?)",
a.actor,
a.recipients,
a.data,
^visibility
)
)
else
Logger.error("Could not exclude visibility to #{visibility}")
query
end
end
defp exclude_visibility(query, %{exclude_visibilities: visibility})
when visibility in @valid_visibilities do
from(
a in query,
where:
not fragment(
"activity_visibility(?, ?, ?) = ?",
a.actor,
a.recipients,
a.data,
^visibility
)
)
end
defp exclude_visibility(query, %{exclude_visibilities: visibility})
when visibility not in [nil | @valid_visibilities] do
Logger.error("Could not exclude visibility to #{visibility}")
query
end
defp exclude_visibility(query, _visibility), do: query
defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _),
do: query
defp restrict_thread_visibility(query, %{user: %User{skip_thread_containment: true}}, _),
do: query
defp restrict_thread_visibility(query, %{user: %User{ap_id: ap_id}}, _) do
local_public = as_local_public()
2022-05-09 19:31:26 +00:00
from(
a in query,
where: fragment("thread_visibility(?, (?)->>'id', ?) = true", ^ap_id, a.data, ^local_public)
)
end
defp restrict_thread_visibility(query, _, _), do: query
def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do
params =
params
|> Map.put(:user, reading_user)
|> Map.put(:actor_id, user.ap_id)
%{
godmode: params[:godmode],
reading_user: reading_user
}
|> user_activities_recipients()
|> fetch_activities(params)
|> Enum.reverse()
end
def fetch_user_activities(user, reading_user, params \\ %{})
def fetch_user_activities(user, reading_user, %{total: true} = params) do
result = fetch_activities_for_user(user, reading_user, params)
Keyword.put(result, :items, Enum.reverse(result[:items]))
end
def fetch_user_activities(user, reading_user, params) do
user
|> fetch_activities_for_user(reading_user, params)
|> Enum.reverse()
end
defp fetch_activities_for_user(user, reading_user, params) do
params =
params
|> Map.put(:type, ["Create", "Announce"])
|> Map.put(:user, reading_user)
|> Map.put(:actor_id, user.ap_id)
|> Map.put(:pinned_object_ids, Map.keys(user.pinned_objects))
params =
if User.blocks?(reading_user, user) do
params
else
params
|> Map.put(:blocking_user, reading_user)
|> Map.put(:muting_user, reading_user)
end
pagination_type = Map.get(params, :pagination_type) || :keyset
2020-12-30 22:37:04 +00:00
%{
godmode: params[:godmode],
reading_user: reading_user
}
|> user_activities_recipients()
2020-12-30 22:37:04 +00:00
|> fetch_activities(params, pagination_type)
end
def fetch_statuses(reading_user, %{total: true} = params) do
result = fetch_activities_for_reading_user(reading_user, params)
Keyword.put(result, :items, Enum.reverse(result[:items]))
end
def fetch_statuses(reading_user, params) do
reading_user
|> fetch_activities_for_reading_user(params)
|> Enum.reverse()
end
defp fetch_activities_for_reading_user(reading_user, params) do
params = Map.put(params, :type, ["Create", "Announce"])
%{
godmode: params[:godmode],
reading_user: reading_user
}
|> user_activities_recipients()
|> fetch_activities(params, :offset)
end
defp user_activities_recipients(%{godmode: true}), do: []
defp user_activities_recipients(%{reading_user: reading_user}) do
if not is_nil(reading_user) and reading_user.local do
2022-05-09 19:31:26 +00:00
[
Constants.as_public(),
as_local_public(),
reading_user.ap_id | User.following(reading_user)
]
else
[Constants.as_public()]
end
end
defp restrict_announce_object_actor(_query, %{announce_filtering_user: _, skip_preload: true}) do
raise "Can't use the child object without preloading!"
end
defp restrict_announce_object_actor(query, %{announce_filtering_user: %{ap_id: actor}}) do
from(
[activity, object] in query,
where:
fragment(
"?->>'type' != ? or ?->>'actor' != ?",
activity.data,
"Announce",
object.data,
^actor
)
)
end
defp restrict_announce_object_actor(query, _), do: query
defp restrict_since(query, %{since_id: ""}), do: query
defp restrict_since(query, %{since_id: since_id}) do
2018-03-30 13:01:53 +00:00
from(activity in query, where: activity.id > ^since_id)
2017-05-16 13:31:11 +00:00
end
2018-03-30 13:01:53 +00:00
2017-05-16 13:31:11 +00:00
defp restrict_since(query, _), do: query
2017-03-21 16:53:20 +00:00
defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
2019-01-10 15:44:28 +00:00
from(
[_activity, object] in query,
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
2019-01-10 15:44:28 +00:00
)
end
defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
restrict_embedded_tag_any(query, %{tag: tag})
end
defp restrict_embedded_tag_all(query, _), do: query
2019-01-10 15:44:28 +00:00
defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
from(
[_activity, object] in query,
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
2018-12-19 16:21:35 +00:00
)
end
defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
restrict_embedded_tag_any(query, %{tag: [tag]})
end
defp restrict_embedded_tag_any(query, _), do: query
defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
2018-12-19 16:21:35 +00:00
from(
[_activity, object] in query,
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
)
end
defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
when is_binary(tag_reject) do
restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
end
defp restrict_embedded_tag_reject_any(query, _), do: query
2019-01-10 15:44:28 +00:00
defp object_ids_query_for_tags(tags) do
from(hto in "hashtags_objects")
|> join(:inner, [hto], ht in Pleroma.Hashtag, on: hto.hashtag_id == ht.id)
|> where([hto, ht], ht.name in ^tags)
|> select([hto], hto.object_id)
|> distinct([hto], true)
end
defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_all(query, %{tag_all: [single_tag]}) do
restrict_hashtag_any(query, %{tag: single_tag})
end
defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
2019-01-10 15:44:28 +00:00
from(
[_activity, object] in query,
where:
fragment(
"""
(SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
AND hashtags_objects.object_id = ?) @> ?
""",
^tags,
object.id,
^tags
)
2019-01-10 15:44:28 +00:00
)
end
defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
restrict_hashtag_all(query, %{tag_all: [tag]})
end
2019-01-10 15:44:28 +00:00
defp restrict_hashtag_all(query, _), do: query
defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
hashtag_ids =
from(ht in Hashtag, where: ht.name in ^tags, select: ht.id)
|> Repo.all()
# Note: NO extra ordering should be done on "activities.id desc nulls last" for optimal plan
from(
[_activity, object] in query,
join: hto in "hashtags_objects",
on: hto.object_id == object.id,
where: hto.hashtag_id in ^hashtag_ids,
distinct: [desc: object.id],
order_by: [desc: object.id]
2018-12-19 16:21:35 +00:00
)
end
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
restrict_hashtag_any(query, %{tag: [tag]})
end
defp restrict_hashtag_any(query, _), do: query
defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
2018-03-30 13:01:53 +00:00
from(
[_activity, object] in query,
where: object.id not in subquery(object_ids_query_for_tags(tags_reject))
2018-03-30 13:01:53 +00:00
)
2017-09-14 11:22:09 +00:00
end
2018-03-30 13:01:53 +00:00
defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
end
defp restrict_hashtag_reject_any(query, _), do: query
defp raise_on_missing_preload do
raise "Can't use the child object without preloading!"
end
2017-09-14 11:22:09 +00:00
2018-05-04 21:16:02 +00:00
defp restrict_recipients(query, [], _user), do: query
2018-03-30 13:01:53 +00:00
defp restrict_recipients(query, recipients, nil) do
2018-03-30 13:01:53 +00:00
from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients))
2017-03-21 16:53:20 +00:00
end
2018-03-30 13:01:53 +00:00
defp restrict_recipients(query, recipients, user) do
2018-03-30 13:01:53 +00:00
from(
activity in query,
where: fragment("? && ?", ^recipients, activity.recipients),
or_where: activity.actor == ^user.ap_id
2018-03-30 13:01:53 +00:00
)
end
2017-03-23 23:09:08 +00:00
defp restrict_local(query, %{local_only: true}) do
2018-03-30 13:01:53 +00:00
from(activity in query, where: activity.local == true)
2017-04-15 10:11:20 +00:00
end
2018-03-30 13:01:53 +00:00
2017-05-16 13:31:11 +00:00
defp restrict_local(query, _), do: query
2017-04-15 10:11:20 +00:00
2021-02-01 11:09:23 +00:00
defp restrict_remote(query, %{remote: true}) do
from(activity in query, where: activity.local == false)
end
defp restrict_remote(query, _), do: query
defp restrict_actor(query, %{actor_id: actor_id}) do
2018-03-30 13:01:53 +00:00
from(activity in query, where: activity.actor == ^actor_id)
end
2018-03-30 13:01:53 +00:00
2017-05-16 13:31:11 +00:00
defp restrict_actor(query, _), do: query
defp restrict_type(query, %{type: type}) when is_binary(type) do
from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type))
2017-09-17 12:20:54 +00:00
end
2018-03-30 13:01:53 +00:00
defp restrict_type(query, %{type: type}) do
2018-03-30 13:01:53 +00:00
from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type))
end
2018-03-30 13:01:53 +00:00
defp restrict_type(query, _), do: query
defp restrict_state(query, %{state: state}) do
2019-05-16 19:09:18 +00:00
from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state))
end
defp restrict_state(query, _), do: query
defp restrict_favorited_by(query, %{favorited_by: ap_id}) do
2018-03-30 13:01:53 +00:00
from(
[_activity, object] in query,
where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id)
2018-03-30 13:01:53 +00:00
)
end
2018-03-30 13:01:53 +00:00
defp restrict_favorited_by(query, _), do: query
defp restrict_media(_query, %{only_media: _val, skip_preload: true}) do
raise "Can't use the child object without preloading!"
end
defp restrict_media(query, %{only_media: true}) do
2018-03-30 13:01:53 +00:00
from(
[activity, object] in query,
where: fragment("(?)->>'type' = ?", activity.data, "Create"),
where: fragment("not (?)->'attachment' = (?)", object.data, ^[])
2018-03-30 13:01:53 +00:00
)
2017-11-14 13:41:16 +00:00
end
2018-03-30 13:01:53 +00:00
2017-11-14 13:41:16 +00:00
defp restrict_media(query, _), do: query
defp restrict_replies(query, %{exclude_replies: true}) do
from(
[_activity, object] in query,
where: fragment("?->>'inReplyTo' is null", object.data)
)
end
2020-04-24 16:25:26 +00:00
defp restrict_replies(query, %{
reply_filtering_user: %User{} = user,
reply_visibility: "self"
2020-04-24 16:25:26 +00:00
}) do
from(
[activity, object] in query,
where:
fragment(
"?->>'inReplyTo' is null OR ? = ANY(?)",
object.data,
^user.ap_id,
activity.recipients
)
)
end
defp restrict_replies(query, %{
reply_filtering_user: %User{} = user,
reply_visibility: "following"
2020-04-24 16:25:26 +00:00
}) do
from(
[activity, object] in query,
where:
fragment(
"""
2020-11-16 16:22:32 +00:00
?->>'type' != 'Create' -- This isn't a Create
OR ?->>'inReplyTo' is null -- this isn't a reply
2020-11-16 16:22:32 +00:00
OR ? && array_remove(?, ?) -- The recipient is us or one of our friends,
-- unless they are the author (because authors
-- are also part of the recipients). This leads
-- to a bug that self-replies by friends won't
-- show up.
OR ? = ? -- The actor is us
""",
activity.data,
2020-04-24 16:25:26 +00:00
object.data,
^[user.ap_id | User.get_cached_user_friends_ap_ids(user)],
activity.recipients,
activity.actor,
activity.actor,
^user.ap_id
)
)
end
defp restrict_replies(query, _), do: query
defp restrict_reblogs(query, %{exclude_reblogs: true}) do
from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data))
end
defp restrict_reblogs(query, _), do: query
defp restrict_muted(query, %{with_muted: true}), do: query
2019-02-27 15:37:42 +00:00
defp restrict_muted(query, %{muting_user: %User{} = user} = opts) do
mutes = opts[:muted_users_ap_ids] || User.muted_users_ap_ids(user)
query =
from([activity] in query,
where: fragment("not (? = ANY(?))", activity.actor, ^mutes),
where:
fragment(
"not (?->'to' \\?| ?) or ? = ?",
activity.data,
^mutes,
activity.actor,
^user.ap_id
)
)
unless opts[:skip_preload] do
from([thread_mute: tm] in query, where: is_nil(tm.user_id))
else
query
end
end
defp restrict_muted(query, _), do: query
defp restrict_blocked(query, %{blocking_user: %User{} = user} = opts) do
blocked_ap_ids = opts[:blocked_users_ap_ids] || User.blocked_users_ap_ids(user)
domain_blocks = user.domain_blocks || []
2018-04-28 14:10:24 +00:00
2019-12-11 09:08:20 +00:00
following_ap_ids = User.get_friends_ap_ids(user)
2018-04-28 14:10:24 +00:00
query =
if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
2018-04-28 14:10:24 +00:00
from(
[activity, object: o] in query,
2020-10-10 06:21:57 +00:00
# You don't block the author
where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
2020-10-10 06:21:57 +00:00
# You don't block any recipients, and didn't author the post
where:
fragment(
"((not (? && ?)) or ? = ?)",
activity.recipients,
^blocked_ap_ids,
activity.actor,
^user.ap_id
),
2020-10-10 06:21:57 +00:00
# You don't block the domain of any recipients, and didn't author the post
2020-06-02 05:50:24 +00:00
where:
fragment(
2020-10-10 06:21:57 +00:00
"(recipients_contain_blocked_domains(?, ?) = false) or ? = ?",
2020-06-02 05:50:24 +00:00
activity.recipients,
2020-10-10 06:21:57 +00:00
^domain_blocks,
activity.actor,
^user.ap_id
2020-06-02 05:50:24 +00:00
),
2020-10-10 06:21:57 +00:00
# It's not a boost of a user you block
where:
fragment(
"not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
activity.data,
activity.data,
^blocked_ap_ids
),
2020-10-10 06:21:57 +00:00
# You don't block the author's domain, and also don't follow the author
2019-12-10 16:41:43 +00:00
where:
fragment(
"(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)",
activity.actor,
^domain_blocks,
activity.actor,
^following_ap_ids
),
2020-10-10 06:21:57 +00:00
# Same as above, but checks the Object
2019-12-10 16:41:43 +00:00
where:
fragment(
"(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)",
o.data,
^domain_blocks,
o.data,
^following_ap_ids
)
)
end
2018-03-30 13:01:53 +00:00
defp restrict_blocked(query, _), do: query
defp restrict_blockers_visibility(query, %{blocking_user: %User{} = user}) do
if Config.get([:activitypub, :blockers_visible]) == true do
query
else
blocker_ap_ids = User.incoming_relationships_ungrouped_ap_ids(user, [:block])
from(
activity in query,
# The author doesn't block you
where: fragment("not (? = ANY(?))", activity.actor, ^blocker_ap_ids),
# It's not a boost of a user that blocks you
where:
fragment(
"not (?->>'type' = 'Announce' and ?->'to' \\?| ?)",
activity.data,
activity.data,
^blocker_ap_ids
)
)
end
end
defp restrict_blockers_visibility(query, _), do: query
defp restrict_unlisted(query, %{restrict_unlisted: true}) do
from(
activity in query,
2018-05-13 08:56:44 +00:00
where:
fragment(
"not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)",
2018-05-13 08:56:44 +00:00
activity.data,
^[Constants.as_public()]
2018-05-13 08:56:44 +00:00
)
)
end
defp restrict_unlisted(query, _), do: query
2020-04-15 13:16:32 +00:00
defp restrict_pinned(query, %{pinned: true, pinned_object_ids: ids}) do
from(
[activity, object: o] in query,
where:
fragment(
2022-07-11 19:49:58 +00:00
"(?)->>'type' = 'Create' and associated_object_id((?)) = any (?)",
activity.data,
activity.data,
^ids
)
)
2019-01-07 13:45:33 +00:00
end
defp restrict_pinned(query, _), do: query
defp restrict_muted_reblogs(query, %{muting_user: %User{} = user} = opts) do
muted_reblogs = opts[:reblog_muted_users_ap_ids] || User.reblog_muted_users_ap_ids(user)
2019-03-11 15:57:54 +00:00
from(
activity in query,
where:
fragment(
"not ( ?->>'type' = 'Announce' and ? = ANY(?))",
activity.data,
activity.actor,
^muted_reblogs
)
2019-03-11 15:57:54 +00:00
)
end
defp restrict_muted_reblogs(query, _), do: query
defp restrict_instance(query, %{instance: instance}) when is_binary(instance) do
2020-08-05 18:08:13 +00:00
from(
activity in query,
where: fragment("split_part(actor::text, '/'::text, 3) = ?", ^instance)
)
end
defp restrict_instance(query, _), do: query
defp restrict_filtered(query, %{user: %User{} = user}) do
case Filter.compose_regex(user) do
nil ->
query
regex ->
from([activity, object] in query,
where:
fragment("not(?->>'content' ~* ?)", object.data, ^regex) or
activity.actor == ^user.ap_id
)
end
end
defp restrict_filtered(query, %{blocking_user: %User{} = user}) do
restrict_filtered(query, %{user: user})
end
defp restrict_filtered(query, _), do: query
defp restrict_unauthenticated(query, nil) do
local = Config.restrict_unauthenticated_access?(:activities, :local)
remote = Config.restrict_unauthenticated_access?(:activities, :remote)
cond do
local and remote ->
from(activity in query, where: false)
local ->
from(activity in query, where: activity.local == false)
remote ->
from(activity in query, where: activity.local == true)
true ->
query
end
end
defp restrict_unauthenticated(query, _), do: query
2023-11-12 13:38:08 +00:00
defp restrict_quote_url(query, %{quote_url: quote_url}) do
from([_activity, object] in query,
where: fragment("(?)->'quoteUrl' = ?", object.data, ^quote_url)
)
end
defp restrict_quote_url(query, _), do: query
defp restrict_rule(query, %{rule_id: rule_id}) do
from(
activity in query,
where: fragment("(?)->'rules' \\? (?)", activity.data, ^rule_id)
)
end
defp restrict_rule(query, _), do: query
defp exclude_poll_votes(query, %{include_poll_votes: true}), do: query
defp exclude_poll_votes(query, _) do
if has_named_binding?(query, :object) do
from([activity, object: o] in query,
where: fragment("not(?->>'type' = ?)", o.data, "Answer")
)
else
query
end
end
defp exclude_chat_messages(query, %{include_chat_messages: true}), do: query
defp exclude_chat_messages(query, _) do
if has_named_binding?(query, :object) do
from([activity, object: o] in query,
where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage")
)
else
query
end
end
defp exclude_invisible_actors(query, %{type: "Flag"}), do: query
defp exclude_invisible_actors(query, %{invisible_actors: true}), do: query
defp exclude_invisible_actors(query, _opts) do
query
|> join(:inner, [activity], u in User,
as: :u,
on: activity.actor == u.ap_id and u.invisible == false
)
end
defp exclude_id(query, %{exclude_id: id}) when is_binary(id) do
from(activity in query, where: activity.id != ^id)
end
defp exclude_id(query, _), do: query
defp maybe_preload_objects(query, %{skip_preload: true}), do: query
defp maybe_preload_objects(query, _) do
query
|> Activity.with_preloaded_object()
end
defp maybe_preload_bookmarks(query, %{skip_preload: true}), do: query
defp maybe_preload_bookmarks(query, opts) do
query
|> Activity.with_preloaded_bookmark(opts[:user])
end
defp maybe_preload_report_notes(query, %{preload_report_notes: true}) do
2019-12-03 14:54:07 +00:00
query
|> Activity.with_preloaded_report_notes()
end
defp maybe_preload_report_notes(query, _), do: query
defp maybe_set_thread_muted_field(query, %{skip_preload: true}), do: query
defp maybe_set_thread_muted_field(query, opts) do
query
|> Activity.with_set_thread_muted_field(opts[:muting_user] || opts[:user])
end
defp maybe_order(query, %{order: :desc}) do
query
|> order_by(desc: :id)
end
defp maybe_order(query, %{order: :asc}) do
query
|> order_by(asc: :id)
end
defp maybe_order(query, _), do: query
defp normalize_fetch_activities_query_opts(opts) do
Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
case opts[key] do
value when is_bitstring(value) ->
Map.put(opts, key, Hashtag.normalize_name(value))
value when is_list(value) ->
normalized_value =
value
|> Enum.map(&Hashtag.normalize_name/1)
|> Enum.uniq()
Map.put(opts, key, normalized_value)
_ ->
opts
end
end)
end
defp fetch_activities_query_ap_ids_ops(opts) do
source_user = opts[:muting_user]
ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
ap_id_relationships =
if opts[:blocking_user] && opts[:blocking_user] == source_user do
[:block | ap_id_relationships]
else
ap_id_relationships
end
preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships)
restrict_blocked_opts = Map.merge(%{blocked_users_ap_ids: preloaded_ap_ids[:block]}, opts)
restrict_muted_opts = Map.merge(%{muted_users_ap_ids: preloaded_ap_ids[:mute]}, opts)
restrict_muted_reblogs_opts =
Map.merge(%{reblog_muted_users_ap_ids: preloaded_ap_ids[:reblog_mute]}, opts)
{restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts}
end
2018-02-18 14:32:11 +00:00
def fetch_activities_query(recipients, opts \\ %{}) do
opts = normalize_fetch_activities_query_opts(opts)
{restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
fetch_activities_query_ap_ids_ops(opts)
config = %{
skip_thread_containment: Config.get([:instance, :skip_thread_containment])
}
2017-03-29 00:05:51 +00:00
query =
Activity
|> maybe_preload_objects(opts)
|> maybe_preload_bookmarks(opts)
|> maybe_preload_report_notes(opts)
|> maybe_set_thread_muted_field(opts)
|> maybe_order(opts)
|> restrict_recipients(recipients, opts[:user])
|> restrict_replies(opts)
|> restrict_since(opts)
|> restrict_local(opts)
|> restrict_remote(opts)
|> restrict_actor(opts)
|> restrict_type(opts)
|> restrict_state(opts)
|> restrict_favorited_by(opts)
|> restrict_blocked(restrict_blocked_opts)
|> restrict_blockers_visibility(opts)
|> restrict_muted(restrict_muted_opts)
|> restrict_filtered(opts)
|> restrict_media(opts)
|> restrict_visibility(opts)
|> restrict_thread_visibility(opts, config)
|> restrict_reblogs(opts)
|> restrict_pinned(opts)
|> restrict_muted_reblogs(restrict_muted_reblogs_opts)
|> restrict_instance(opts)
|> restrict_announce_object_actor(opts)
|> restrict_filtered(opts)
|> restrict_rule(opts)
2023-11-12 13:38:08 +00:00
|> restrict_quote_url(opts)
|> maybe_restrict_deactivated_users(opts)
|> exclude_poll_votes(opts)
|> exclude_chat_messages(opts)
|> exclude_invisible_actors(opts)
|> exclude_visibility(opts)
if Config.feature_enabled?(:improved_hashtag_timeline) do
query
|> restrict_hashtag_any(opts)
|> restrict_hashtag_all(opts)
|> restrict_hashtag_reject_any(opts)
else
query
|> restrict_embedded_tag_any(opts)
|> restrict_embedded_tag_all(opts)
|> restrict_embedded_tag_reject_any(opts)
end
2019-05-13 09:15:14 +00:00
end
@doc """
Fetch favorites activities of user with order by sort adds to favorites
"""
@spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t())
2019-11-18 13:56:25 +00:00
def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do
user.ap_id
|> Activity.Queries.by_actor()
|> Activity.Queries.by_type("Like")
|> Activity.with_joined_object()
|> Object.with_joined_activity()
|> select([like, object, activity], %{activity | object: object, pagination_id: like.id})
|> order_by([like, _, _], desc_nulls_last: like.id)
2019-11-18 18:34:54 +00:00
|> Pagination.fetch_paginated(
Map.merge(params, %{skip_order: true}),
pagination
2019-11-18 18:34:54 +00:00
)
end
defp maybe_update_cc(activities, [_ | _] = list_memberships, %User{ap_id: user_ap_id}) do
2019-05-13 09:15:14 +00:00
Enum.map(activities, fn
%{data: %{"bcc" => [_ | _] = bcc}} = activity ->
2019-05-13 09:15:14 +00:00
if Enum.any?(bcc, &(&1 in list_memberships)) do
update_in(activity.data["cc"], &[user_ap_id | &1])
2019-05-13 09:15:14 +00:00
else
activity
end
activity ->
activity
end)
2017-04-21 16:54:21 +00:00
end
defp maybe_update_cc(activities, _, _), do: activities
defp fetch_activities_bounded_query(query, recipients, recipients_with_public) do
from(activity in query,
where:
fragment("? && ?", activity.recipients, ^recipients) or
(fragment("? && ?", activity.recipients, ^recipients_with_public) and
^Constants.as_public() in activity.recipients)
)
end
def fetch_activities_bounded(
recipients,
recipients_with_public,
opts \\ %{},
pagination \\ :keyset
) do
fetch_activities_query([], opts)
|> fetch_activities_bounded_query(recipients, recipients_with_public)
|> Pagination.fetch_paginated(opts, pagination)
|> Enum.reverse()
end
@spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()}
2018-11-23 16:40:45 +00:00
def upload(file, opts \\ []) do
2022-12-29 19:33:46 +00:00
with {:ok, data} <- Upload.store(sanitize_upload_file(file), opts) do
obj_data = Maps.put_if_present(data, "actor", opts[:actor])
Repo.insert(%Object{data: obj_data})
end
2017-03-29 00:05:51 +00:00
end
2022-12-29 19:33:46 +00:00
defp sanitize_upload_file(%Plug.Upload{filename: filename} = upload) when is_binary(filename) do
%Plug.Upload{
upload
| filename: Path.basename(filename)
}
end
defp sanitize_upload_file(upload), do: upload
@spec get_actor_url(any()) :: binary() | nil
defp get_actor_url(url) when is_binary(url), do: url
defp get_actor_url(%{"href" => href}) when is_binary(href), do: href
defp get_actor_url(url) when is_list(url) do
url
|> List.first()
|> get_actor_url()
end
defp get_actor_url(_url), do: nil
defp normalize_image(%{"url" => url}) do
%{
"type" => "Image",
"url" => [%{"href" => url}]
}
end
2018-03-30 13:01:53 +00:00
defp normalize_image(urls) when is_list(urls), do: urls |> List.first() |> normalize_image()
defp normalize_image(_), do: nil
2018-02-25 15:14:25 +00:00
2021-03-04 10:06:12 +00:00
defp object_to_user_data(data, additional) do
fields =
data
|> Map.get("attachment", [])
|> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end)
|> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end)
emojis =
data
|> Map.get("tag", [])
|> Enum.filter(fn
%{"type" => "Emoji"} -> true
_ -> false
end)
|> Map.new(fn %{"icon" => %{"url" => url}, "name" => name} ->
{String.trim(name, ":"), url}
end)
is_locked = data["manuallyApprovesFollowers"] || false
capabilities = data["capabilities"] || %{}
accepts_chat_messages = capabilities["acceptsChatMessages"]
data = Transmogrifier.maybe_fix_user_object(data)
2020-10-14 19:33:54 +00:00
is_discoverable = data["discoverable"] || false
invisible = data["invisible"] || false
actor_type = data["type"] || "Person"
featured_address = data["featured"]
{:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
public_key =
if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
data["publicKey"]["publicKeyPem"]
end
shared_inbox =
if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do
data["endpoints"]["sharedInbox"]
end
2022-01-22 19:21:55 +00:00
birthday =
if is_binary(data["vcard:bday"]) do
case Date.from_iso8601(data["vcard:bday"]) do
{:ok, date} -> date
{:error, _} -> nil
end
end
show_birthday = !!birthday
2021-03-04 10:06:12 +00:00
# if WebFinger request was already done, we probably have acct, otherwise
# we request WebFinger here
nickname = additional[:nickname_from_acct] || generate_nickname(data)
%{
2018-02-25 15:14:25 +00:00
ap_id: data["id"],
uri: get_actor_url(data["url"]),
banner: normalize_image(data["image"]),
fields: fields,
emoji: emojis,
is_locked: is_locked,
is_discoverable: is_discoverable,
invisible: invisible,
avatar: normalize_image(data["icon"]),
2018-02-25 15:14:25 +00:00
name: data["name"],
follower_address: data["followers"],
2019-07-10 13:01:32 +00:00
following_address: data["following"],
featured_address: featured_address,
bio: data["summary"] || "",
actor_type: actor_type,
also_known_as: Map.get(data, "alsoKnownAs", []),
public_key: public_key,
inbox: data["inbox"],
shared_inbox: shared_inbox,
accepts_chat_messages: accepts_chat_messages,
birthday: birthday,
2021-03-04 10:06:12 +00:00
show_birthday: show_birthday,
pinned_objects: pinned_objects,
nickname: nickname
2018-02-25 15:14:25 +00:00
}
2021-03-04 10:06:12 +00:00
end
2018-02-25 15:14:25 +00:00
2021-03-04 10:06:12 +00:00
defp generate_nickname(%{"preferredUsername" => username} = data) when is_binary(username) do
generated = "#{username}@#{URI.parse(data["id"]).host}"
if Config.get([WebFinger, :update_nickname_on_user_fetch]) do
case WebFinger.finger(generated) do
{:ok, %{"subject" => "acct:" <> acct}} -> acct
_ -> generated
end
else
2021-03-04 10:06:12 +00:00
generated
end
2018-02-25 15:14:25 +00:00
end
2021-03-04 10:06:12 +00:00
# nickname can be nil because of virtual actors
defp generate_nickname(_), do: nil
def fetch_follow_information_for_user(user) do
with {:ok, following_data} <-
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
{:ok, hide_follows} <- collection_private(following_data),
{:ok, followers_data} <-
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
{:ok, hide_followers} <- collection_private(followers_data) do
{:ok,
%{
hide_follows: hide_follows,
follower_count: normalize_counter(followers_data["totalItems"]),
following_count: normalize_counter(following_data["totalItems"]),
hide_followers: hide_followers
}}
else
{:error, _} = e -> e
e -> {:error, e}
end
end
defp normalize_counter(counter) when is_integer(counter), do: counter
defp normalize_counter(_), do: 0
def maybe_update_follow_information(user_data) do
with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])},
{_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]},
{_, true} <-
{:collections_available,
!!(user_data[:following_address] && user_data[:follower_address])},
{:ok, info} <-
fetch_follow_information_for_user(user_data) do
info = Map.merge(user_data[:info] || %{}, info)
user_data
|> Map.put(:info, info)
else
{:user_type_check, false} ->
user_data
{:collections_available, false} ->
user_data
{:enabled, false} ->
user_data
e ->
Logger.error(
"Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e)
)
user_data
end
end
defp collection_private(%{"first" => %{"type" => type}})
when type in ["CollectionPage", "OrderedCollectionPage"],
do: {:ok, false}
2019-11-29 19:26:45 +00:00
defp collection_private(%{"first" => first}) do
with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <-
Fetcher.fetch_and_contain_remote_object_from_id(first) do
{:ok, false}
else
{:error, _} -> {:ok, true}
end
end
2019-11-29 19:26:45 +00:00
defp collection_private(_data), do: {:ok, true}
2021-03-04 10:06:12 +00:00
def user_data_from_user_object(data, additional \\ []) do
with {:ok, data} <- MRF.filter(data) do
2021-03-04 10:06:12 +00:00
{:ok, object_to_user_data(data, additional)}
else
e -> {:error, e}
end
end
defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
2021-03-04 10:06:12 +00:00
{:ok, data} <- user_data_from_user_object(data, additional) do
{:ok, maybe_update_follow_information(data)}
2018-02-23 14:00:19 +00:00
else
# If this has been deleted, only log a debug and not an error
{:error, "Object has been deleted" = e} ->
Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
{:error, e}
2020-07-13 12:23:03 +00:00
{:error, {:reject, reason} = e} ->
Logger.info("Rejected user #{ap_id}: #{inspect(reason)}")
{:error, e}
{:error, e} ->
2019-10-18 00:30:01 +00:00
Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
{:error, e}
2018-02-21 21:21:40 +00:00
end
end
def maybe_handle_clashing_nickname(data) do
2020-08-18 15:21:34 +00:00
with nickname when is_binary(nickname) <- data[:nickname],
%User{} = old_user <- User.get_by_nickname(nickname),
{_, false} <- {:ap_id_comparison, data[:ap_id] == old_user.ap_id} do
Logger.info(
"Found an old user for #{nickname}, the old ap id is #{old_user.ap_id}, new one is #{data[:ap_id]}, renaming."
)
old_user
|> User.remote_user_changeset(%{nickname: "#{old_user.id}.#{old_user.nickname}"})
|> User.update_and_set_cache()
else
{:ap_id_comparison, true} ->
Logger.info(
2020-08-18 15:21:34 +00:00
"Found an old user for #{data[:nickname]}, but the ap id #{data[:ap_id]} is the same as the new user. Race condition? Not changing anything."
)
_ ->
nil
end
end
def pin_data_from_featured_collection(%{
"type" => type,
"orderedItems" => objects
})
when type in ["OrderedCollection", "Collection"] do
Map.new(objects, fn
%{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
object_ap_id when is_binary(object_ap_id) -> {object_ap_id, NaiveDateTime.utc_now()}
end)
end
def pin_data_from_featured_collection(obj) do
Logger.error("Could not parse featured collection #{inspect(obj)}")
%{}
end
def fetch_and_prepare_featured_from_ap_id(nil) do
{:ok, %{}}
end
def fetch_and_prepare_featured_from_ap_id(ap_id) do
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
{:ok, pin_data_from_featured_collection(data)}
else
e ->
Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
{:ok, %{}}
end
end
def pinned_fetch_task(nil), do: nil
def pinned_fetch_task(%{pinned_objects: pins}) do
if Enum.all?(pins, fn {ap_id, _} ->
Object.get_cached_by_ap_id(ap_id) ||
match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
end) do
:ok
else
:error
end
end
2021-03-04 10:06:12 +00:00
def make_user_from_ap_id(ap_id, additional \\ []) do
user = User.get_cached_by_ap_id(ap_id)
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
{:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
if user do
user
|> User.remote_user_changeset(data)
|> User.update_and_set_cache()
else
maybe_handle_clashing_nickname(data)
data
|> User.remote_user_changeset()
|> Repo.insert()
|> User.set_cache()
2018-02-18 22:11:31 +00:00
end
2018-02-11 16:20:02 +00:00
end
end
2018-02-18 11:27:05 +00:00
def make_user_from_nickname(nickname) do
2021-03-04 10:06:12 +00:00
with {:ok, %{"ap_id" => ap_id, "subject" => "acct:" <> acct}} when not is_nil(ap_id) <-
WebFinger.finger(nickname) do
make_user_from_ap_id(ap_id, nickname_from_acct: acct)
2018-02-25 15:52:33 +00:00
else
2018-03-19 17:56:49 +00:00
_e -> {:error, "No AP id in WebFinger"}
2018-02-18 11:27:05 +00:00
end
end
# filter out broken threads
defp contain_broken_threads(%Activity{} = activity, %User{} = user) do
entire_thread_visible_for_user?(activity, user)
end
# do post-processing on a specific activity
def contain_activity(%Activity{} = activity, %User{} = user) do
2019-03-11 15:57:54 +00:00
contain_broken_threads(activity, user)
end
2019-05-08 16:19:20 +00:00
def fetch_direct_messages_query do
Activity
|> restrict_type(%{type: "Create"})
|> restrict_visibility(%{visibility: "direct"})
|> order_by([activity], asc: activity.id)
end
defp maybe_restrict_deactivated_users(activity, %{type: "Flag"}), do: activity
defp maybe_restrict_deactivated_users(activity, _opts),
do: Activity.restrict_deactivated_users(activity)
2017-03-21 08:21:52 +00:00
end