From e4ac2a7cd69aa5e87d9dc277c0271e15466e3215 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 16 Dec 2022 02:56:32 -0500 Subject: [PATCH 1/7] Detail backup states --- lib/pleroma/ecto_enums.ex | 8 ++ lib/pleroma/user/backup.ex | 116 ++++++++++++++---- lib/pleroma/workers/backup_worker.ex | 2 +- .../20221216052127_add_state_to_backups.exs | 21 ++++ test/pleroma/user/backup_test.exs | 48 +++++++- 5 files changed, 166 insertions(+), 29 deletions(-) create mode 100644 priv/repo/migrations/20221216052127_add_state_to_backups.exs diff --git a/lib/pleroma/ecto_enums.ex b/lib/pleroma/ecto_enums.ex index a4890b489..b346b39d6 100644 --- a/lib/pleroma/ecto_enums.ex +++ b/lib/pleroma/ecto_enums.ex @@ -27,3 +27,11 @@ failed: 4, manual: 5 ) + +defenum(Pleroma.User.Backup.State, + pending: 1, + running: 2, + complete: 3, + failed: 4, + invalid: 5 +) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 9df010605..74001f9c3 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -15,6 +15,7 @@ defmodule Pleroma.User.Backup do alias Pleroma.Bookmark alias Pleroma.Repo alias Pleroma.User + alias Pleroma.User.Backup.State alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.UserView @@ -25,12 +26,16 @@ defmodule Pleroma.User.Backup do field(:file_name, :string) field(:file_size, :integer, default: 0) field(:processed, :boolean, default: false) + field(:state, State, default: :invalid) + field(:processed_number, :integer, default: 0) belongs_to(:user, User, type: FlakeId.Ecto.CompatType) timestamps() end + @report_every 100 + def create(user, admin_id \\ nil) do with :ok <- validate_limit(user, admin_id), {:ok, backup} <- user |> new() |> Repo.insert() do @@ -46,7 +51,8 @@ def new(user) do %__MODULE__{ user_id: user.id, content_type: "application/zip", - file_name: name + file_name: name, + state: :pending } end @@ -109,27 +115,75 @@ def remove_outdated(%__MODULE__{id: latest_id, user_id: user_id}) do def get(id), do: Repo.get(__MODULE__, id) + defp set_state(backup, state, processed_number \\ nil) do + struct = + %{state: state} + |> Pleroma.Maps.put_if_present(:processed_number, processed_number) + + backup + |> cast(struct, [:state, :processed_number]) + |> Repo.update() + end + def process(%__MODULE__{} = backup) do - with {:ok, zip_file} <- export(backup), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- upload(backup, zip_file) do - backup - |> cast(%{file_size: size, processed: true}, [:file_size, :processed]) - |> Repo.update() + set_state(backup, :running, 0) + + current_pid = self() + + Task.Supervisor.async_nolink( + Pleroma.TaskSupervisor, + fn -> + with {:ok, zip_file} <- export(backup, current_pid), + {:ok, %{size: size}} <- File.stat(zip_file), + {:ok, _upload} <- upload(backup, zip_file) do + backup + |> cast( + %{ + file_size: size, + processed: true, + state: :complete + }, + [:file_size, :processed, :state] + ) + |> Repo.update() + + send(current_pid, :completed) + end + end + ) + + wait_backup(backup, backup.processed_number) + end + + defp wait_backup(backup, current_processed) do + receive do + {:progress, new_processed} -> + total_processed = current_processed + new_processed + + with {:ok, updated_backup} <- set_state(backup, :running, total_processed) do + wait_backup(updated_backup, total_processed) + else + _ -> wait_backup(backup, total_processed) + end + + :completed -> + {:ok, get(backup.id)} + after + 30_000 -> set_state(backup, :failed) end end @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json'] - def export(%__MODULE__{} = backup) do + def export(%__MODULE__{} = backup, caller_pid \\ nil) do backup = Repo.preload(backup, :user) name = String.trim_trailing(backup.file_name, ".zip") dir = dir(name) with :ok <- File.mkdir(dir), - :ok <- actor(dir, backup.user), - :ok <- statuses(dir, backup.user), - :ok <- likes(dir, backup.user), - :ok <- bookmarks(dir, backup.user), + :ok <- actor(dir, backup.user, caller_pid), + :ok <- statuses(dir, backup.user, caller_pid), + :ok <- likes(dir, backup.user, caller_pid), + :ok <- bookmarks(dir, backup.user, caller_pid), {:ok, zip_path} <- :zip.create(String.to_charlist(dir <> ".zip"), @files, cwd: dir), {:ok, _} <- File.rm_rf(dir) do {:ok, to_string(zip_path)} @@ -157,11 +211,12 @@ def upload(%__MODULE__{} = backup, zip_path) do end end - defp actor(dir, user) do + defp actor(dir, user, caller_pid) do with {:ok, json} <- UserView.render("user.json", %{user: user}) |> Map.merge(%{"likes" => "likes.json", "bookmarks" => "bookmarks.json"}) |> Jason.encode() do + send(caller_pid, {:progress, 1}) File.write(Path.join(dir, "actor.json"), json) end end @@ -180,7 +235,9 @@ defp write_header(file, name) do ) end - defp write(query, dir, name, fun) do + defp should_report?(num), do: rem(num, @report_every) == 0 + + defp write(query, dir, name, fun, caller_pid) do path = Path.join(dir, "#{name}.json") with {:ok, file} <- File.open(path, [:write, :utf8]), @@ -192,35 +249,41 @@ defp write(query, dir, name, fun) do with {:ok, data} <- fun.(i), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do + if should_report?(acc + 1) do + send(caller_pid, {:progress, @report_every}) + end + acc + 1 else _ -> acc end end) + send(caller_pid, {:progress, rem(total, @report_every)}) + with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file) end end end - defp bookmarks(dir, %{id: user_id} = _user) do + defp bookmarks(dir, %{id: user_id} = _user, caller_pid) do Bookmark |> where(user_id: ^user_id) |> join(:inner, [b], activity in assoc(b, :activity)) |> select([b, a], %{id: b.id, object: fragment("(?)->>'object'", a.data)}) - |> write(dir, "bookmarks", fn a -> {:ok, a.object} end) + |> write(dir, "bookmarks", fn a -> {:ok, a.object} end, caller_pid) end - defp likes(dir, user) do + defp likes(dir, user, caller_pid) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> select([like], %{id: like.id, object: fragment("(?)->>'object'", like.data)}) - |> write(dir, "likes", fn a -> {:ok, a.object} end) + |> write(dir, "likes", fn a -> {:ok, a.object} end, caller_pid) end - defp statuses(dir, user) do + defp statuses(dir, user, caller_pid) do opts = %{} |> Map.put(:type, ["Create", "Announce"]) @@ -233,10 +296,15 @@ defp statuses(dir, user) do ] |> Enum.concat() |> ActivityPub.fetch_activities_query(opts) - |> write(dir, "outbox", fn a -> - with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do - {:ok, Map.delete(activity, "@context")} - end - end) + |> write( + dir, + "outbox", + fn a -> + with {:ok, activity} <- Transmogrifier.prepare_outgoing(a.data) do + {:ok, Map.delete(activity, "@context")} + end + end, + caller_pid + ) end end diff --git a/lib/pleroma/workers/backup_worker.ex b/lib/pleroma/workers/backup_worker.ex index 12ee70f00..a485ddb4b 100644 --- a/lib/pleroma/workers/backup_worker.ex +++ b/lib/pleroma/workers/backup_worker.ex @@ -51,7 +51,7 @@ def perform(%Job{args: %{"op" => "delete", "backup_id" => backup_id}}) do end @impl Oban.Worker - def timeout(_job), do: :timer.seconds(900) + def timeout(_job), do: :infinity defp has_email?(user) do not is_nil(user.email) and user.email != "" diff --git a/priv/repo/migrations/20221216052127_add_state_to_backups.exs b/priv/repo/migrations/20221216052127_add_state_to_backups.exs new file mode 100644 index 000000000..73b30fc35 --- /dev/null +++ b/priv/repo/migrations/20221216052127_add_state_to_backups.exs @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2022 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Repo.Migrations.AddStateToBackups do + use Ecto.Migration + + def up do + alter table(:backups) do + add(:state, :integer, default: 5) + add(:processed_number, :integer, default: 0) + end + end + + def down do + alter table(:backups) do + remove(:state) + remove(:processed_number) + end + end +end diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs index 5c9b94000..a536b4a4a 100644 --- a/test/pleroma/user/backup_test.exs +++ b/test/pleroma/user/backup_test.exs @@ -39,7 +39,7 @@ test "it creates a backup record and an Oban job" do assert_enqueued(worker: BackupWorker, args: args) backup = Backup.get(args["backup_id"]) - assert %Backup{user_id: ^user_id, processed: false, file_size: 0} = backup + assert %Backup{user_id: ^user_id, processed: false, file_size: 0, state: :pending} = backup end test "it return an error if the export limit is over" do @@ -59,7 +59,30 @@ test "it process a backup record" do assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) assert {:ok, backup} = perform_job(BackupWorker, args) assert backup.file_size > 0 - assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id} = backup + assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup + + delete_job_args = %{"op" => "delete", "backup_id" => backup_id} + + assert_enqueued(worker: BackupWorker, args: delete_job_args) + assert {:ok, backup} = perform_job(BackupWorker, delete_job_args) + refute Backup.get(backup_id) + + email = Pleroma.Emails.UserEmail.backup_is_ready_email(backup) + + assert_email_sent( + to: {user.name, user.email}, + html_body: email.html_body + ) + end + + test "it updates states of the backup" do + clear_config([Pleroma.Upload, :uploader], Pleroma.Uploaders.Local) + %{id: user_id} = user = insert(:user) + + assert {:ok, %Oban.Job{args: %{"backup_id" => backup_id} = args}} = Backup.create(user) + assert {:ok, backup} = perform_job(BackupWorker, args) + assert backup.file_size > 0 + assert %Backup{id: ^backup_id, processed: true, user_id: ^user_id, state: :complete} = backup delete_job_args = %{"op" => "delete", "backup_id" => backup_id} @@ -148,7 +171,7 @@ test "it creates a zip archive with user data" do Bookmark.create(user.id, status3.id) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup) + assert {:ok, path} = Backup.export(backup, self()) assert {:ok, zipfile} = :zip.zip_open(String.to_charlist(path), [:memory]) assert {:ok, {'actor.json', json}} = :zip.zip_get('actor.json', zipfile) @@ -230,6 +253,23 @@ test "it creates a zip archive with user data" do File.rm!(path) end + test "it counts the correct number processed" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + Enum.map(1..120, fn i -> + {:ok, status} = CommonAPI.post(user, %{status: "status #{i}"}) + CommonAPI.favorite(user, status.id) + Bookmark.create(user.id, status.id) + end) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + {:ok, backup} = Backup.process(backup) + + assert backup.processed_number == 1 + 120 + 120 + 120 + + Backup.delete(backup) + end + describe "it uploads and deletes a backup archive" do setup do clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com") @@ -246,7 +286,7 @@ test "it creates a zip archive with user data" do Bookmark.create(user.id, status3.id) assert {:ok, backup} = user |> Backup.new() |> Repo.insert() - assert {:ok, path} = Backup.export(backup) + assert {:ok, path} = Backup.export(backup, self()) [path: path, backup: backup] end From bdd63d2a3a5fe76f8a8ac1f71cb8698be85bfef4 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 16 Dec 2022 14:10:48 -0500 Subject: [PATCH 2/7] Expose backup status via Pleroma API --- .../operations/pleroma_backup_operation.ex | 8 +++- .../web/pleroma_api/views/backup_view.ex | 10 +++++ .../pleroma_api/views/backup_view_test.exs | 39 +++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex index 45fa2b058..5655527e0 100644 --- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex +++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex @@ -64,7 +64,9 @@ defp backup do content_type: %Schema{type: :string}, file_name: %Schema{type: :string}, file_size: %Schema{type: :integer}, - processed: %Schema{type: :boolean} + processed: %Schema{type: :boolean, description: "whether this backup has succeeded"}, + state: %Schema{type: :string, description: "the state of the backup", enum: ["pending", "running", "complete", "failed"]}, + processed_number: %Schema{type: :integer, description: "the number of records processed"} }, example: %{ "content_type" => "application/zip", @@ -72,7 +74,9 @@ defp backup do "https://cofe.fe:4000/media/backups/archive-foobar-20200908T164207-Yr7vuT5Wycv-sN3kSN2iJ0k-9pMo60j9qmvRCdDqIew.zip", "file_size" => 4105, "inserted_at" => "2020-09-08T16:42:07.000Z", - "processed" => true + "processed" => true, + "state" => "complete", + "processed_number": 20 } } end diff --git a/lib/pleroma/web/pleroma_api/views/backup_view.ex b/lib/pleroma/web/pleroma_api/views/backup_view.ex index d778590f0..02d891791 100644 --- a/lib/pleroma/web/pleroma_api/views/backup_view.ex +++ b/lib/pleroma/web/pleroma_api/views/backup_view.ex @@ -9,12 +9,22 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do alias Pleroma.Web.CommonAPI.Utils def render("show.json", %{backup: %Backup{} = backup}) do + # To deal with records before the migration + state = + if backup.state == :invalid do + if backup.processed, do: :complete, else: :failed + else + backup.state + end + %{ id: backup.id, content_type: backup.content_type, url: download_url(backup), file_size: backup.file_size, processed: backup.processed, + state: to_string(state), + processed_number: backup.processed_number, inserted_at: Utils.to_masto_date(backup.inserted_at) } end diff --git a/test/pleroma/web/pleroma_api/views/backup_view_test.exs b/test/pleroma/web/pleroma_api/views/backup_view_test.exs index a86688bc4..6908463d6 100644 --- a/test/pleroma/web/pleroma_api/views/backup_view_test.exs +++ b/test/pleroma/web/pleroma_api/views/backup_view_test.exs @@ -15,4 +15,43 @@ test "it renders the ID" do result = BackupView.render("show.json", backup: backup) assert result.id == backup.id end + + test "it renders the state and processed_number" do + user = insert(:user) + backup = Backup.new(user) + + result = BackupView.render("show.json", backup: backup) + assert result.state == to_string(backup.state) + assert result.processed_number == backup.processed_number + end + + test "it renders failed state with legacy records" do + backup = %Backup{ + id: 0, + content_type: "application/zip", + file_name: "dummy", + file_size: 1, + state: :invalid, + processed: true, + processed_number: 1, + inserted_at: NaiveDateTime.utc_now() + } + + result = BackupView.render("show.json", backup: backup) + assert result.state == "complete" + + backup = %Backup{ + id: 0, + content_type: "application/zip", + file_name: "dummy", + file_size: 1, + state: :invalid, + processed: false, + processed_number: 1, + inserted_at: NaiveDateTime.utc_now() + } + + result = BackupView.render("show.json", backup: backup) + assert result.state == "failed" + end end From 46ab97d72145a122cc0e0ada3cd9f0af5449bdf3 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 16 Dec 2022 14:13:46 -0500 Subject: [PATCH 3/7] Simplify backup update clause --- lib/pleroma/user/backup.ex | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 74001f9c3..97b8718c1 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -160,11 +160,8 @@ defp wait_backup(backup, current_processed) do {:progress, new_processed} -> total_processed = current_processed + new_processed - with {:ok, updated_backup} <- set_state(backup, :running, total_processed) do - wait_backup(updated_backup, total_processed) - else - _ -> wait_backup(backup, total_processed) - end + set_state(backup, :running, total_processed) + wait_backup(backup, total_processed) :completed -> {:ok, get(backup.id)} From a1b95922c5c4a4351b52c0f7a484a96efed08be9 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 16 Dec 2022 14:16:07 -0500 Subject: [PATCH 4/7] Fix compile error --- lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex index 5655527e0..ea3fc230a 100644 --- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex +++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex @@ -76,7 +76,7 @@ defp backup do "inserted_at" => "2020-09-08T16:42:07.000Z", "processed" => true, "state" => "complete", - "processed_number": 20 + "processed_number" => 20 } } end From 070fbb89e1bc51669b03555b72c6852a769f1e15 Mon Sep 17 00:00:00 2001 From: tusooa Date: Fri, 16 Dec 2022 14:29:21 -0500 Subject: [PATCH 5/7] Lint --- .../api_spec/operations/pleroma_backup_operation.ex | 6 +++++- lib/pleroma/web/pleroma_api/views/backup_view.ex | 10 +++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex index ea3fc230a..400f3825d 100644 --- a/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex +++ b/lib/pleroma/web/api_spec/operations/pleroma_backup_operation.ex @@ -65,7 +65,11 @@ defp backup do file_name: %Schema{type: :string}, file_size: %Schema{type: :integer}, processed: %Schema{type: :boolean, description: "whether this backup has succeeded"}, - state: %Schema{type: :string, description: "the state of the backup", enum: ["pending", "running", "complete", "failed"]}, + state: %Schema{ + type: :string, + description: "the state of the backup", + enum: ["pending", "running", "complete", "failed"] + }, processed_number: %Schema{type: :integer, description: "the number of records processed"} }, example: %{ diff --git a/lib/pleroma/web/pleroma_api/views/backup_view.ex b/lib/pleroma/web/pleroma_api/views/backup_view.ex index 02d891791..20403aeee 100644 --- a/lib/pleroma/web/pleroma_api/views/backup_view.ex +++ b/lib/pleroma/web/pleroma_api/views/backup_view.ex @@ -11,11 +11,11 @@ defmodule Pleroma.Web.PleromaAPI.BackupView do def render("show.json", %{backup: %Backup{} = backup}) do # To deal with records before the migration state = - if backup.state == :invalid do - if backup.processed, do: :complete, else: :failed - else - backup.state - end + if backup.state == :invalid do + if backup.processed, do: :complete, else: :failed + else + backup.state + end %{ id: backup.id, From 7d3e4eaeb94a5381d56a3281e5813b7e0c63c8dd Mon Sep 17 00:00:00 2001 From: tusooa Date: Sun, 18 Dec 2022 15:55:52 -0500 Subject: [PATCH 6/7] Log errors more extensively --- lib/pleroma/user/backup.ex | 105 +++++++++++++++++++++--------- test/pleroma/user/backup_test.exs | 50 ++++++++++++++ 2 files changed, 125 insertions(+), 30 deletions(-) diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 97b8718c1..cb9a40ba1 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -9,6 +9,7 @@ defmodule Pleroma.User.Backup do import Ecto.Query import Pleroma.Web.Gettext + require Logger require Pleroma.Constants alias Pleroma.Activity @@ -130,48 +131,79 @@ def process(%__MODULE__{} = backup) do current_pid = self() - Task.Supervisor.async_nolink( - Pleroma.TaskSupervisor, - fn -> - with {:ok, zip_file} <- export(backup, current_pid), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- upload(backup, zip_file) do - backup - |> cast( - %{ - file_size: size, - processed: true, - state: :complete - }, - [:file_size, :processed, :state] - ) - |> Repo.update() + task = + Task.Supervisor.async_nolink( + Pleroma.TaskSupervisor, + __MODULE__, + :do_process, + [backup, current_pid] + ) - send(current_pid, :completed) - end - end - ) - - wait_backup(backup, backup.processed_number) + wait_backup(backup, backup.processed_number, task) end - defp wait_backup(backup, current_processed) do + def do_process(backup, current_pid) do + with {:ok, zip_file} <- export(backup, current_pid), + {:ok, %{size: size}} <- File.stat(zip_file), + {:ok, _upload} <- upload(backup, zip_file) do + backup + |> cast( + %{ + file_size: size, + processed: true, + state: :complete + }, + [:file_size, :processed, :state] + ) + |> Repo.update() + end + end + + defp wait_backup(backup, current_processed, task) do receive do {:progress, new_processed} -> total_processed = current_processed + new_processed set_state(backup, :running, total_processed) - wait_backup(backup, total_processed) + wait_backup(backup, total_processed, task) - :completed -> - {:ok, get(backup.id)} + {:DOWN, _ref, _proc, _pid, reason} -> + backup = get(backup.id) + + if reason != :normal do + Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}") + + {:ok, backup} = set_state(backup, :failed) + + {:error, + %{ + backup: backup, + reason: :exit, + details: reason + }} + else + {:ok, backup} + end after - 30_000 -> set_state(backup, :failed) + 30_000 -> + Logger.error( + "Backup #{backup.id} timed out after no response for 30 seconds, terminating" + ) + + Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) + + {:ok, backup} = set_state(backup, :failed) + + {:error, + %{ + backup: backup, + reason: :timeout + }} end end @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json'] - def export(%__MODULE__{} = backup, caller_pid \\ nil) do + def export(%__MODULE__{} = backup, caller_pid) do backup = Repo.preload(backup, :user) name = String.trim_trailing(backup.file_name, ".zip") dir = dir(name) @@ -243,7 +275,12 @@ defp write(query, dir, name, fun, caller_pid) do query |> Pleroma.Repo.chunk_stream(100) |> Enum.reduce(0, fn i, acc -> - with {:ok, data} <- fun.(i), + with {:ok, data} <- + (try do + fun.(i) + rescue + e -> {:error, e} + end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do if should_report?(acc + 1) do @@ -252,7 +289,15 @@ defp write(query, dir, name, fun, caller_pid) do acc + 1 else - _ -> acc + {:error, e} -> + Logger.warn( + "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}" + ) + + acc + + _ -> + acc end end) diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs index a536b4a4a..066bf6ba8 100644 --- a/test/pleroma/user/backup_test.exs +++ b/test/pleroma/user/backup_test.exs @@ -270,6 +270,56 @@ test "it counts the correct number processed" do Backup.delete(backup) end + test "it handles errors" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + Enum.map(1..120, fn i -> + {:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"}) + end) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + + with_mock Pleroma.Web.ActivityPub.Transmogrifier, + [:passthrough], + prepare_outgoing: fn data -> + object = + data["object"] + |> Pleroma.Object.normalize(fetch: false) + |> Map.get(:data) + + data = data |> Map.put("object", object) + + if String.contains?(data["object"]["content"], "119"), + do: raise(%Postgrex.Error{}), + else: {:ok, data} + end do + {:ok, backup} = Backup.process(backup) + assert backup.processed + assert backup.state == :complete + assert backup.processed_number == 1 + 119 + + Backup.delete(backup) + end + end + + test "it handles unrecoverable exceptions" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + + with_mock Backup, [:passthrough], do_process: fn _, _ -> raise "mock exception" end do + {:error, %{backup: backup, reason: :exit}} = Backup.process(backup) + + assert backup.state == :failed + end + + with_mock Backup, [:passthrough], do_process: fn _, _ -> Process.sleep(:timer.seconds(32)) end do + {:error, %{backup: backup, reason: :timeout}} = Backup.process(backup) + + assert backup.state == :failed + end + end + describe "it uploads and deletes a backup archive" do setup do clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com") From 179efd94677d1d30bdbbbbaafc899c8c908181d2 Mon Sep 17 00:00:00 2001 From: tusooa Date: Sat, 24 Dec 2022 00:17:17 -0500 Subject: [PATCH 7/7] Make backup parameters configurable --- config/config.exs | 4 +++- config/description.exs | 15 +++++++++++++++ lib/pleroma/user/backup.ex | 37 ++++++++++++++++++++++++++----------- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/config/config.exs b/config/config.exs index e41ec2f91..ecb592b9c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -874,7 +874,9 @@ config :pleroma, Pleroma.User.Backup, purge_after_days: 30, limit_days: 7, - dir: nil + dir: nil, + process_wait_time: 30_000, + process_chunk_size: 100 config :pleroma, ConcurrentLimiter, [ {Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]}, diff --git a/config/description.exs b/config/description.exs index bf4734426..996267558 100644 --- a/config/description.exs +++ b/config/description.exs @@ -3394,6 +3394,21 @@ type: :integer, description: "Limit user to export not more often than once per N days", suggestions: [7] + }, + %{ + key: :process_wait_time, + type: :integer, + label: "Process Wait Time", + description: + "The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.", + suggestions: [30_000] + }, + %{ + key: :process_chunk_size, + type: :integer, + label: "Process Chunk Size", + description: "The number of activities to fetch in the backup job for each chunk.", + suggestions: [100] } ] }, diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index cb9a40ba1..447fca2a1 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -35,8 +35,6 @@ defmodule Pleroma.User.Backup do timestamps() end - @report_every 100 - def create(user, admin_id \\ nil) do with :ok <- validate_limit(user, admin_id), {:ok, backup} <- user |> new() |> Repo.insert() do @@ -160,6 +158,8 @@ def do_process(backup, current_pid) do end defp wait_backup(backup, current_processed, task) do + wait_time = Pleroma.Config.get([__MODULE__, :process_wait_time]) + receive do {:progress, new_processed} -> total_processed = current_processed + new_processed @@ -175,6 +175,8 @@ defp wait_backup(backup, current_processed, task) do {:ok, backup} = set_state(backup, :failed) + cleanup(backup) + {:error, %{ backup: backup, @@ -185,15 +187,17 @@ defp wait_backup(backup, current_processed, task) do {:ok, backup} end after - 30_000 -> + wait_time -> Logger.error( - "Backup #{backup.id} timed out after no response for 30 seconds, terminating" + "Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating" ) Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) {:ok, backup} = set_state(backup, :failed) + cleanup(backup) + {:error, %{ backup: backup, @@ -205,8 +209,7 @@ defp wait_backup(backup, current_processed, task) do @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json'] def export(%__MODULE__{} = backup, caller_pid) do backup = Repo.preload(backup, :user) - name = String.trim_trailing(backup.file_name, ".zip") - dir = dir(name) + dir = backup_tempdir(backup) with :ok <- File.mkdir(dir), :ok <- actor(dir, backup.user, caller_pid), @@ -264,16 +267,28 @@ defp write_header(file, name) do ) end - defp should_report?(num), do: rem(num, @report_every) == 0 + defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0 + + defp backup_tempdir(backup) do + name = String.trim_trailing(backup.file_name, ".zip") + dir(name) + end + + defp cleanup(backup) do + dir = backup_tempdir(backup) + File.rm_rf(dir) + end defp write(query, dir, name, fun, caller_pid) do path = Path.join(dir, "#{name}.json") + chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size]) + with {:ok, file} <- File.open(path, [:write, :utf8]), :ok <- write_header(file, name) do total = query - |> Pleroma.Repo.chunk_stream(100) + |> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity) |> Enum.reduce(0, fn i, acc -> with {:ok, data} <- (try do @@ -283,8 +298,8 @@ defp write(query, dir, name, fun, caller_pid) do end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do - if should_report?(acc + 1) do - send(caller_pid, {:progress, @report_every}) + if should_report?(acc + 1, chunk_size) do + send(caller_pid, {:progress, chunk_size}) end acc + 1 @@ -301,7 +316,7 @@ defp write(query, dir, name, fun, caller_pid) do end end) - send(caller_pid, {:progress, rem(total, @report_every)}) + send(caller_pid, {:progress, rem(total, chunk_size)}) with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do File.close(file)