add options to mix pleroma.database prune_objects to delete more activities

This commit is contained in:
faried nawaz 2023-09-18 02:13:01 +05:00
parent 6cfb0d7ddb
commit 1bf3ae07b6
No known key found for this signature in database
GPG Key ID: EDCFAF5BA3A2F622
2 changed files with 625 additions and 27 deletions

View File

@ -67,43 +67,168 @@ def run(["prune_objects" | args]) do
OptionParser.parse( OptionParser.parse(
args, args,
strict: [ strict: [
vacuum: :boolean vacuum: :boolean,
keep_threads: :boolean,
keep_non_public: :boolean,
prune_orphaned_activities: :boolean
] ]
) )
start_pleroma() start_pleroma()
deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400))
Logger.info("Pruning objects older than #{deadline} days") log_message = "Pruning objects older than #{deadline} days"
time_deadline = log_message =
NaiveDateTime.utc_now() if Keyword.get(options, :keep_non_public) do
|> NaiveDateTime.add(-(deadline * 86_400)) log_message <> ", keeping non public posts"
else
log_message
end
from(o in Object, log_message =
where: if Keyword.get(options, :keep_threads) do
log_message <> ", keeping threads intact"
else
log_message
end
log_message =
if Keyword.get(options, :prune_orphaned_activities) do
log_message <> ", pruning orphaned activities"
else
log_message
end
log_message =
if Keyword.get(options, :vacuum) do
log_message <>
", doing a full vacuum (you shouldn't do this as a recurring maintanance task)"
else
log_message
end
Logger.info(log_message)
if Keyword.get(options, :keep_threads) do
# We want to delete objects from threads where
# 1. the newest post is still old
# 2. none of the activities is local
# 3. none of the activities is bookmarked
# 4. optionally none of the posts is non-public
deletable_context =
if Keyword.get(options, :keep_non_public) do
Pleroma.Activity
|> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
|> group_by([a], fragment("? ->> 'context'::text", a.data))
|> having(
[a],
not fragment(
# Posts (checked on Create Activity) is non-public
"bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')",
a.data,
^Pleroma.Constants.as_public(),
a.data,
^Pleroma.Constants.as_public(),
a.data
)
)
else
Pleroma.Activity
|> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
|> group_by([a], fragment("? ->> 'context'::text", a.data))
end
|> having([a], max(a.updated_at) < ^time_deadline)
|> having([a], not fragment("bool_or(?)", a.local))
|> having([_, b], fragment("max(?::text) is null", b.id))
|> select([a], fragment("? ->> 'context'::text", a.data))
Pleroma.Object
|> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
else
if Keyword.get(options, :keep_non_public) do
Pleroma.Object
|> where(
[o],
fragment( fragment(
"?->'to' \\? ? OR ?->'cc' \\? ?", "?->'to' \\? ? OR ?->'cc' \\? ?",
o.data, o.data,
^Pleroma.Constants.as_public(), ^Pleroma.Constants.as_public(),
o.data, o.data,
^Pleroma.Constants.as_public() ^Pleroma.Constants.as_public()
), )
where: o.inserted_at < ^time_deadline, )
where: else
Pleroma.Object
end
|> where([o], o.updated_at < ^time_deadline)
|> where(
[o],
fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host()) fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
) )
end
|> Repo.delete_all(timeout: :infinity) |> Repo.delete_all(timeout: :infinity)
prune_hashtags_query = """ if !Keyword.get(options, :keep_threads) do
# Without the --keep-threads option, it's possible that bookmarked
# objects have been deleted. We remove the corresponding bookmarks.
"""
delete from public.bookmarks
where id in (
select b.id from public.bookmarks b
left join public.activities a on b.activity_id = a.id
left join public.objects o on a."data" ->> 'object' = o.data ->> 'id'
where o.id is null
)
"""
|> Repo.query([], timeout: :infinity)
end
if Keyword.get(options, :prune_orphaned_activities) do
# Prune activities who link to a single object
"""
delete from public.activities
where id in (
select a.id from public.activities a
left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
left join public.users u on a.data ->> 'object' = u.ap_id
where not a.local
and jsonb_typeof(a."data" -> 'object') = 'string'
and o.id is null
and a2.id is null
and u.id is null
)
"""
|> Repo.query([], timeout: :infinity)
# Prune activities who link to an array of objects
"""
delete from public.activities
where id in (
select a.id from public.activities a
join json_array_elements_text((a."data" -> 'object')::json) as j on jsonb_typeof(a."data" -> 'object') = 'array'
left join public.objects o on j.value = o.data ->> 'id'
left join public.activities a2 on j.value = a2.data ->> 'id'
left join public.users u on j.value = u.ap_id
group by a.id
having max(o.data ->> 'id') is null
and max(a2.data ->> 'id') is null
and max(u.ap_id) is null
)
"""
|> Repo.query([], timeout: :infinity)
end
"""
DELETE FROM hashtags AS ht DELETE FROM hashtags AS ht
WHERE NOT EXISTS ( WHERE NOT EXISTS (
SELECT 1 FROM hashtags_objects hto SELECT 1 FROM hashtags_objects hto
WHERE ht.id = hto.hashtag_id) WHERE ht.id = hto.hashtag_id)
""" """
|> Repo.query()
Repo.query(prune_hashtags_query)
if Keyword.get(options, :vacuum) do if Keyword.get(options, :vacuum) do
Maintenance.vacuum("full") Maintenance.vacuum("full")

View File

@ -7,6 +7,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
use Oban.Testing, repo: Pleroma.Repo use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Bookmark
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
@ -45,28 +46,500 @@ test "it replaces objects with references" do
end end
describe "prune_objects" do describe "prune_objects" do
test "it prunes old objects from the database" do setup do
insert(:note)
deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) + 1 deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) + 1
date = old_insert_date =
Timex.now() Timex.now()
|> Timex.shift(days: -deadline) |> Timex.shift(days: -deadline)
|> Timex.to_naive_datetime() |> Timex.to_naive_datetime()
|> NaiveDateTime.truncate(:second) |> NaiveDateTime.truncate(:second)
%{id: id} = %{old_insert_date: old_insert_date}
end
test "it prunes old objects from the database", %{old_insert_date: old_insert_date} do
insert(:note)
%{id: note_remote_public_id} =
:note :note
|> insert() |> insert()
|> Ecto.Changeset.change(%{inserted_at: date}) |> Ecto.Changeset.change(%{updated_at: old_insert_date})
|> Repo.update!() |> Repo.update!()
assert length(Repo.all(Object)) == 2 note_remote_non_public =
%{id: note_remote_non_public_id, data: note_remote_non_public_data} =
:note
|> insert()
note_remote_non_public
|> Ecto.Changeset.change(%{
updated_at: old_insert_date,
data: note_remote_non_public_data |> update_in(["to"], fn _ -> [] end)
})
|> Repo.update!()
assert length(Repo.all(Object)) == 3
Mix.Tasks.Pleroma.Database.run(["prune_objects"]) Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Object)) == 1 assert length(Repo.all(Object)) == 1
refute Object.get_by_id(id) refute Object.get_by_id(note_remote_public_id)
refute Object.get_by_id(note_remote_non_public_id)
end
test "it cleans up bookmarks", %{old_insert_date: old_insert_date} do
user = insert(:user)
{:ok, old_object_activity} = CommonAPI.post(user, %{status: "yadayada"})
Repo.one(Object)
|> Ecto.Changeset.change(%{updated_at: old_insert_date})
|> Repo.update!()
{:ok, new_object_activity} = CommonAPI.post(user, %{status: "yadayada"})
{:ok, _} = Bookmark.create(user.id, old_object_activity.id)
{:ok, _} = Bookmark.create(user.id, new_object_activity.id)
assert length(Repo.all(Object)) == 2
assert length(Repo.all(Bookmark)) == 2
Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Object)) == 1
assert length(Repo.all(Bookmark)) == 1
refute Bookmark.get(user.id, old_object_activity.id)
end
test "with the --keep-non-public option it still keeps non-public posts even if they are not local",
%{old_insert_date: old_insert_date} do
insert(:note)
%{id: note_remote_id} =
:note
|> insert()
|> Ecto.Changeset.change(%{updated_at: old_insert_date})
|> Repo.update!()
note_remote_non_public =
%{data: note_remote_non_public_data} =
:note
|> insert()
note_remote_non_public
|> Ecto.Changeset.change(%{
updated_at: old_insert_date,
data: note_remote_non_public_data |> update_in(["to"], fn _ -> [] end)
})
|> Repo.update!()
assert length(Repo.all(Object)) == 3
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-non-public"])
assert length(Repo.all(Object)) == 2
refute Object.get_by_id(note_remote_id)
end
test "with the --keep-threads and --keep-non-public option it keeps old threads with non-public replies even if the interaction is not local",
%{old_insert_date: old_insert_date} do
# For non-public we only check Create Activities because only these are relevant for threads
# Flags are always non-public, Announces from relays can be non-public...
remote_user1 = insert(:user, local: false)
remote_user2 = insert(:user, local: false)
# Old remote non-public reply (should be kept)
{:ok, old_remote_post1_activity} =
CommonAPI.post(remote_user1, %{status: "some thing", local: false})
old_remote_post1_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_remote_non_public_reply_activity} =
CommonAPI.post(remote_user2, %{
status: "some reply",
in_reply_to_status_id: old_remote_post1_activity.id
})
old_remote_non_public_reply_activity
|> Ecto.Changeset.change(%{
local: false,
updated_at: old_insert_date,
data: old_remote_non_public_reply_activity.data |> update_in(["to"], fn _ -> [] end)
})
|> Repo.update!()
# Old remote non-public Announce (should be removed)
{:ok, old_remote_post2_activity = %{data: %{"object" => old_remote_post2_id}}} =
CommonAPI.post(remote_user1, %{status: "some thing", local: false})
old_remote_post2_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_remote_non_public_repeat_activity} =
CommonAPI.repeat(old_remote_post2_activity.id, remote_user2)
old_remote_non_public_repeat_activity
|> Ecto.Changeset.change(%{
local: false,
updated_at: old_insert_date,
data: old_remote_non_public_repeat_activity.data |> update_in(["to"], fn _ -> [] end)
})
|> Repo.update!()
assert length(Repo.all(Object)) == 3
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-threads", "--keep-non-public"])
Repo.all(Pleroma.Activity)
assert length(Repo.all(Object)) == 2
refute Object.get_by_ap_id(old_remote_post2_id)
end
test "with the --keep-threads option it still keeps non-old threads even with no local interactions" do
remote_user = insert(:user, local: false)
remote_user2 = insert(:user, local: false)
{:ok, remote_post_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
{:ok, remote_post_reply_activity} =
CommonAPI.post(remote_user2, %{
status: "some reply",
in_reply_to_status_id: remote_post_activity.id
})
remote_post_activity
|> Ecto.Changeset.change(%{local: false})
|> Repo.update!()
remote_post_reply_activity
|> Ecto.Changeset.change(%{local: false})
|> Repo.update!()
assert length(Repo.all(Object)) == 2
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-threads"])
assert length(Repo.all(Object)) == 2
end
test "with the --keep-threads option it deletes old threads with no local interaction", %{
old_insert_date: old_insert_date
} do
remote_user = insert(:user, local: false)
remote_user2 = insert(:user, local: false)
{:ok, old_remote_post_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
old_remote_post_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_remote_post_reply_activity} =
CommonAPI.post(remote_user2, %{
status: "some reply",
in_reply_to_status_id: old_remote_post_activity.id
})
old_remote_post_reply_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_favourite_activity} =
CommonAPI.favorite(remote_user2, old_remote_post_activity.id)
old_favourite_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_repeat_activity} = CommonAPI.repeat(old_remote_post_activity.id, remote_user2)
old_repeat_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
assert length(Repo.all(Object)) == 2
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-threads"])
assert length(Repo.all(Object)) == 0
end
test "with the --keep-threads option it keeps old threads with local interaction", %{
old_insert_date: old_insert_date
} do
remote_user = insert(:user, local: false)
local_user = insert(:user, local: true)
# local reply
{:ok, old_remote_post1_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
old_remote_post1_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_local_post2_reply_activity} =
CommonAPI.post(local_user, %{
status: "some reply",
in_reply_to_status_id: old_remote_post1_activity.id
})
old_local_post2_reply_activity
|> Ecto.Changeset.change(%{local: true, updated_at: old_insert_date})
|> Repo.update!()
# local Like
{:ok, old_remote_post3_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
old_remote_post3_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_favourite_activity} = CommonAPI.favorite(local_user, old_remote_post3_activity.id)
old_favourite_activity
|> Ecto.Changeset.change(%{local: true, updated_at: old_insert_date})
|> Repo.update!()
# local Announce
{:ok, old_remote_post4_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
old_remote_post4_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
{:ok, old_repeat_activity} = CommonAPI.repeat(old_remote_post4_activity.id, local_user)
old_repeat_activity
|> Ecto.Changeset.change(%{local: true, updated_at: old_insert_date})
|> Repo.update!()
assert length(Repo.all(Object)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-threads"])
assert length(Repo.all(Object)) == 4
end
test "with the --keep-threads option it keeps old threads with bookmarked posts", %{
old_insert_date: old_insert_date
} do
remote_user = insert(:user, local: false)
local_user = insert(:user, local: true)
{:ok, old_remote_post_activity} =
CommonAPI.post(remote_user, %{status: "some thing", local: false})
old_remote_post_activity
|> Ecto.Changeset.change(%{local: false, updated_at: old_insert_date})
|> Repo.update!()
Pleroma.Bookmark.create(local_user.id, old_remote_post_activity.id)
assert length(Repo.all(Object)) == 1
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--keep-threads"])
assert length(Repo.all(Object)) == 1
end
test "We don't have unexpected tables which may contain objects that are referenced by activities" do
# We can delete orphaned activities. For that we look for the objects they reference in the 'objects', 'activities', and 'users' table.
# If someone adds another table with objects (idk, maybe with separate relations, or collections or w/e), then we need to make sure we
# add logic for that in the 'prune_objects' task so that we don't wrongly delete their corresponding activities.
# So when someone adds (or removes) a table, this test will fail.
# Either the table contains objects which can be referenced from the activities table
# => in that case the prune_objects job should be adapted so we don't delete activities who still have the referenced object.
# Or it doesn't contain objects which can be referenced from the activities table
# => in that case you can add/remove the table to/from this (sorted) list.
assert Repo.query!(
"SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';"
).rows
|> Enum.sort() == [
["activities"],
["announcement_read_relationships"],
["announcements"],
["apps"],
["backups"],
["bookmarks"],
["chat_message_references"],
["chats"],
["config"],
["conversation_participation_recipient_ships"],
["conversation_participations"],
["conversations"],
["counter_cache"],
["data_migration_failed_ids"],
["data_migrations"],
["deliveries"],
["filters"],
["following_relationships"],
["hashtags"],
["hashtags_objects"],
["instances"],
["lists"],
["markers"],
["mfa_tokens"],
["moderation_log"],
["notifications"],
["oauth_authorizations"],
["oauth_tokens"],
["oban_jobs"],
["oban_peers"],
["objects"],
["password_reset_tokens"],
["push_subscriptions"],
["registrations"],
["report_notes"],
["scheduled_activities"],
["schema_migrations"],
["thread_mutes"],
# ["user_follows_hashtag"], # not in pleroma
# ["user_frontend_setting_profiles"], # not in pleroma
["user_invite_tokens"],
["user_notes"],
["user_relationships"],
["users"]
]
end
test "it prunes orphaned activities with the --prune-orphaned-activities" do
# Add a remote activity which references an Object
%Object{} |> Map.merge(%{data: %{"id" => "object_for_activity"}}) |> Repo.insert()
%Activity{}
|> Map.merge(%{
local: false,
data: %{"id" => "remote_activity_with_object", "object" => "object_for_activity"}
})
|> Repo.insert()
# Add a remote activity which references an activity
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_with_activity",
"object" => "remote_activity_with_object"
}
})
|> Repo.insert()
# Add a remote activity which references an Actor
%User{} |> Map.merge(%{ap_id: "actor"}) |> Repo.insert()
%Activity{}
|> Map.merge(%{
local: false,
data: %{"id" => "remote_activity_with_actor", "object" => "actor"}
})
|> Repo.insert()
# Add a remote activity without existing referenced object, activity or actor
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_without_existing_referenced_object",
"object" => "non_existing"
}
})
|> Repo.insert()
# Add a local activity without existing referenced object, activity or actor
%Activity{}
|> Map.merge(%{
local: true,
data: %{"id" => "local_activity_with_actor", "object" => "non_existing"}
})
|> Repo.insert()
# The remote activities without existing reference, and only the remote activities without existing reference, are deleted
# if, and only if, we provide the --prune-orphaned-activities option
assert length(Repo.all(Activity)) == 5
Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Activity)) == 5
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--prune-orphaned-activities"])
activities = Repo.all(Activity)
assert "remote_activity_without_existing_referenced_object" not in Enum.map(
activities,
fn a -> a.data["id"] end
)
assert length(activities) == 4
end
test "it prunes orphaned activities with the --prune-orphaned-activities when the objects are referenced from an array" do
%Object{} |> Map.merge(%{data: %{"id" => "existing_object"}}) |> Repo.insert()
%User{} |> Map.merge(%{ap_id: "existing_actor"}) |> Repo.insert()
# Multiple objects, one object exists (keep)
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_existing_object",
"object" => ["non_ existing_object", "existing_object"]
}
})
|> Repo.insert()
# Multiple objects, one actor exists (keep)
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_existing_actor",
"object" => ["non_ existing_object", "existing_actor"]
}
})
|> Repo.insert()
# Multiple objects, one activity exists (keep)
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_existing_activity",
"object" => ["non_ existing_object", "remote_activity_existing_actor"]
}
})
|> Repo.insert()
# Multiple objects none exist (prune)
%Activity{}
|> Map.merge(%{
local: false,
data: %{
"id" => "remote_activity_without_existing_referenced_object",
"object" => ["owo", "whats_this"]
}
})
|> Repo.insert()
assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--prune-orphaned-activities"])
activities = Repo.all(Activity)
assert length(activities) == 3
assert "remote_activity_without_existing_referenced_object" not in Enum.map(
activities,
fn a -> a.data["id"] end
)
assert length(activities) == 3
end end
end end