Merge branch 'from/upstream-develop/tusooa/backup-status' into 'develop'

Detail backup states

Closes #3024

See merge request pleroma/pleroma!3809
This commit is contained in:
Haelwenn 2023-06-27 12:08:11 +00:00
commit 41f2ee69a8
10 changed files with 351 additions and 33 deletions

View File

@ -871,7 +871,9 @@
config :pleroma, Pleroma.User.Backup,
purge_after_days: 30,
limit_days: 7,
dir: nil
dir: nil,
process_wait_time: 30_000,
process_chunk_size: 100
config :pleroma, ConcurrentLimiter, [
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},

View File

@ -3364,6 +3364,21 @@
type: :integer,
description: "Limit user to export not more often than once per N days",
suggestions: [7]
},
%{
key: :process_wait_time,
type: :integer,
label: "Process Wait Time",
description:
"The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.",
suggestions: [30_000]
},
%{
key: :process_chunk_size,
type: :integer,
label: "Process Chunk Size",
description: "The number of activities to fetch in the backup job for each chunk.",
suggestions: [100]
}
]
},

View File

@ -27,3 +27,11 @@
failed: 4,
manual: 5
)
defenum(Pleroma.User.Backup.State,
pending: 1,
running: 2,
complete: 3,
failed: 4,
invalid: 5
)

View File

@ -9,12 +9,14 @@ defmodule Pleroma.User.Backup do
import Ecto.Query
import Pleroma.Web.Gettext
require Logger
require Pleroma.Constants
alias Pleroma.Activity
alias Pleroma.Bookmark
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.User.Backup.State
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.UserView
@ -25,6 +27,8 @@ defmodule Pleroma.User.Backup do
field(:file_name, :string)
field(:file_size, :integer, default: 0)
field(:processed, :boolean, default: false)
field(:state, State, default: :invalid)
field(:processed_number, :integer, default: 0)
belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
@ -46,7 +50,8 @@ def new(user) do
%__MODULE__{
user_id: user.id,
content_type: "application/zip",
file_name: name
file_name: name,
state: :pending
}
end
@ -109,27 +114,108 @@ def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do
def get(id), do: Repo.get(__MODULE__, id)
defp set_state(backup, state, processed_number \\ nil) do
struct =
%{state: state}
|> Pleroma.Maps.put_if_present(:processed_number, processed_number)
backup
|> cast(struct, [:state, :processed_number])
|> Repo.update()
end
def process(%__MODULE__{} = backup) do
with {:ok, zip_file} <- export(backup),
set_state(backup, :running, 0)
current_pid = self()
task =
Task.Supervisor.async_nolink(
Pleroma.TaskSupervisor,
__MODULE__,
:do_process,
[backup, current_pid]
)
wait_backup(backup, backup.processed_number, task)
end
def do_process(backup, current_pid) do
with {:ok, zip_file} <- export(backup, current_pid),
{:ok, %{size: size}} <- File.stat(zip_file),
{:ok, _upload} <- upload(backup, zip_file) do
backup
|> cast(%{file_size: size, processed: true}, [:file_size, :processed])
|> cast(
%{
file_size: size,
processed: true,
state: :complete
},
[:file_size, :processed, :state]
)
|> Repo.update()
end
end
defp wait_backup(backup, current_processed, task) do
wait_time = Pleroma.Config.get([__MODULE__, :process_wait_time])
receive do
{:progress, new_processed} ->
total_processed = current_processed + new_processed
set_state(backup, :running, total_processed)
wait_backup(backup, total_processed, task)
{:DOWN, _ref, _proc, _pid, reason} ->
backup = get(backup.id)
if reason != :normal do
Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
{:ok, backup} = set_state(backup, :failed)
cleanup(backup)
{:error,
%{
backup: backup,
reason: :exit,
details: reason
}}
else
{:ok, backup}
end
after
wait_time ->
Logger.error(
"Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
)
Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
{:ok, backup} = set_state(backup, :failed)
cleanup(backup)
{:error,
%{
backup: backup,
reason: :timeout
}}
end
end
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
def export(%__MODULE__{} = backup) do
def export(%__MODULE__{} = backup, caller_pid) do
backup = Repo.preload(backup, :user)
name = String.trim_trailing(backup.file_name, ".zip")
dir = dir(name)
dir = backup_tempdir(backup)
with :ok <- File.mkdir(dir),
:ok <- actor(dir, backup.user),
:ok <- statuses(dir, backup.user),
:ok <- likes(dir, backup.user),
:ok <- bookmarks(dir, backup.user),
:ok <- actor(dir, backup.user, caller_pid),
:ok <- statuses(dir, backup.user, caller_pid),
:ok <- likes(dir, backup.user, caller_pid),
:ok <- bookmarks(dir, backup.user, caller_pid),
{:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir),
{:ok, _} <- File.rm_rf(dir) do
{:ok, to_string(zip_path)}
@ -157,11 +243,12 @@ def upload(%__MODULE__{} = backup, zip_path) do
end
end
defp actor(dir, user) do
defp actor(dir, user, caller_pid) do
with {:ok, json} <-
UserView.render("user.json", %{user: user})
|> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"})
|> Jason.encode() do
send(caller_pid, {:progress, 1})
File.write(Path.join(dir, "actor.json"), json)
end
end
@ -180,47 +267,80 @@ defp write_header(file, name) do
)
end
defp write(query, dir, name, fun) do
defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
defp backup_tempdir(backup) do
name = String.trim_trailing(backup.file_name, ".zip")
dir(name)
end
defp cleanup(backup) do
dir = backup_tempdir(backup)
File.rm_rf(dir)
end
defp write(query, dir, name, fun, caller_pid) do
path = Path.join(dir, "#{name}.json")
chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
with {:ok, file} <- File.open(path, [:write, :utf8]),
:ok <- write_header(file, name) do
total =
query
|> Pleroma.Repo.chunk_stream(100)
|> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
|> Enum.reduce(0, fn i, acc ->
with {:ok, data} <- fun.(i),
with {:ok, data} <-
(try do
fun.(i)
rescue
e -> {:error, e}
end),
{:ok, str} <- Jason.encode(data),
:ok <- IO.write(file, str <> ",\n") do
if should_report?(acc + 1, chunk_size) do
send(caller_pid, {:progress, chunk_size})
end
acc + 1
else
_ -> acc
{:error, e} ->
Logger.warn(
"Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
)
acc
_ ->
acc
end
end)
send(caller_pid, {:progress, rem(total, chunk_size)})
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
File.close(file)
end
end
end
defp bookmarks(dir, %{id: user_id} = _user) do
defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do
Bookmark
|> where(user_id: ^user_id)
|> join(:inner, [b], activity in assoc(b, :activity))
|> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)})
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end)
|> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid)
end
defp likes(dir, user) do
defp likes(dir, user, caller_pid) do
user.ap_id
|> Activity.Queries.by_actor()
|> Activity.Queries.by_type("Like")
|> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)})
|> write(dir, "likes", fn a -> {:ok, a.object} end)
|> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid)
end
defp statuses(dir, user) do
defp statuses(dir, user, caller_pid) do
opts =
%{}
|> Map.put(:type, ["Create", "Announce"])
@ -233,10 +353,15 @@ defp statuses(dir, user) do
]
|> Enum.concat()
|> ActivityPub.fetch_activities_query(opts)
|> write(dir, "outbox", fn a ->
|> write(
dir,
"outbox",
fn a ->
with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do
{:ok, Map.delete(activity, "@context")}
end
end)
end,
caller_pid
)
end
end

View File

@ -64,7 +64,13 @@ defp backup do
content_type: %Schema{type: :string},
file_name: %Schema{type: :string},
file_size: %Schema{type: :integer},
processed: %Schema{type: :boolean}
processed: %Schema{type: :boolean, description: "whether this backup has succeeded"},
state: %Schema{
type: :string,
description: "the state of the backup",
enum: ["pending", "running", "complete", "failed"]
},
processed_number: %Schema{type: :integer, description: "the number of records processed"}
},
example: %{
"content_type" => "application/zip",
@ -72,7 +78,9 @@ defp backup do
"https://cofe.fe:4000/media/backups/archive-foobar-20200908T164207-Yr7vuT5Wycv-sN3kSN2iJ0k-9pMo60j9qmvRCdDqIew.zip",
"file_size" => 4105,
"inserted_at" => "2020-09-08T16:42:07.000Z",
"processed" => true
"processed" => true,
"state" => "complete",
"processed_number" => 20
}
}
end

View File

@ -9,12 +9,22 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do
alias Pleroma.Web.CommonAPI.Utils
def render("show.json", %{backup: %Backup{} = backup}) do
# To deal with records before the migration
state =
if backup.state == :invalid do
if backup.processed, do: :complete, else: :failed
else
backup.state
end
%{
id: backup.id,
content_type: backup.content_type,
url: download_url(backup),
file_size: backup.file_size,
processed: backup.processed,
state: to_string(state),
processed_number: backup.processed_number,
inserted_at: Utils.to_masto_date(backup.inserted_at)
}
end

View File

@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(900)
def timeout(_job), do: :infinity
defp has_email?(user) do
not is_nil(user.email) and user.email != ""

View File

@ -0,0 +1,21 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo.Migrations.AddStateToBackups do
use Ecto.Migration
def up do
alter table(:backups) do
add(:state, :integer, default: 5)
add(:processed_number, :integer, default: 0)
end
end
def down do
alter table(:backups) do
remove(:state)
remove(:processed_number)
end
end
end

View File

@ -39,7 +39,7 @@ test "it creates a backup record and an Oban job" do
assert_enqueued(worker: BackupWorker, args: args)
backup = Backup.get(args["backup_id"])
assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup
assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup
end
test "it return an error if the export limit is over" do
@ -59,7 +59,30 @@ test "it process a backup record" do
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
assert {:ok, backup} = perform_job(BackupWorker, args)
assert backup.file_size > 0
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
assert_enqueued(worker: BackupWorker, args: delete_job_args)
assert {:ok, backup} = perform_job(BackupWorker, delete_job_args)
refute Backup.get(backup_id)
email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup)
assert_email_sent(
to: {user.name, user.email},
html_body: email.html_body
)
end
test "it updates states of the backup" do
clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local)
%{id: user_id} = user = insert(:user)
assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user)
assert {:ok, backup} = perform_job(BackupWorker, args)
assert backup.file_size > 0
assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup
delete_job_args = %{"op" => "delete", "backup_id" => backup_id}
@ -148,7 +171,7 @@ test "it creates a zip archive with user data" do
Bookmark.create(user.id, status3.id)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
assert {:ok, path} = Backup.export(backup)
assert {:ok, path} = Backup.export(backup, self())
assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory])
assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile)
@ -230,6 +253,73 @@ test "it creates a zip archive with user data" do
File.rm!(path)
end
test "it counts the correct number processed" do
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
Enum.map(1..120, fn i ->
{:ok, status} = CommonAPI.post(user, %{status: "status #{i}"})
CommonAPI.favorite(user, status.id)
Bookmark.create(user.id, status.id)
end)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
{:ok, backup} = Backup.process(backup)
assert backup.processed_number == 1 + 120 + 120 + 120
Backup.delete(backup)
end
test "it handles errors" do
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
Enum.map(1..120, fn i ->
{:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"})
end)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
with_mock Pleroma.Web.ActivityPub.Transmogrifier,
[:passthrough],
prepare_outgoing: fn data ->
object =
data["object"]
|> Pleroma.Object.normalize(fetch: false)
|> Map.get(:data)
data = data |> Map.put("object", object)
if String.contains?(data["object"]["content"], "119"),
do: raise(%Postgrex.Error{}),
else: {:ok, data}
end do
{:ok, backup} = Backup.process(backup)
assert backup.processed
assert backup.state == :complete
assert backup.processed_number == 1 + 119
Backup.delete(backup)
end
end
test "it handles unrecoverable exceptions" do
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
with_mock Backup, [:passthrough], do_process: fn _, _ -> raise "mock exception" end do
{:error, %{backup: backup, reason: :exit}} = Backup.process(backup)
assert backup.state == :failed
end
with_mock Backup, [:passthrough], do_process: fn _, _ -> Process.sleep(:timer.seconds(32)) end do
{:error, %{backup: backup, reason: :timeout}} = Backup.process(backup)
assert backup.state == :failed
end
end
describe "it uploads and deletes a backup archive" do
setup do
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
@ -246,7 +336,7 @@ test "it creates a zip archive with user data" do
Bookmark.create(user.id, status3.id)
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
assert {:ok, path} = Backup.export(backup)
assert {:ok, path} = Backup.export(backup, self())
[path: path, backup: backup]
end

View File

@ -15,4 +15,43 @@ test "it renders the ID" do
result = BackupView.render("show.json", backup: backup)
assert result.id == backup.id
end
test "it renders the state and processed_number" do
user = insert(:user)
backup = Backup.new(user)
result = BackupView.render("show.json", backup: backup)
assert result.state == to_string(backup.state)
assert result.processed_number == backup.processed_number
end
test "it renders failed state with legacy records" do
backup = %Backup{
id: 0,
content_type: "application/zip",
file_name: "dummy",
file_size: 1,
state: :invalid,
processed: true,
processed_number: 1,
inserted_at: NaiveDateTime.utc_now()
}
result = BackupView.render("show.json", backup: backup)
assert result.state == "complete"
backup = %Backup{
id: 0,
content_type: "application/zip",
file_name: "dummy",
file_size: 1,
state: :invalid,
processed: false,
processed_number: 1,
inserted_at: NaiveDateTime.utc_now()
}
result = BackupView.render("show.json", backup: backup)
assert result.state == "failed"
end
end