diff --git a/lib/pleroma/user/backup.ex b/lib/pleroma/user/backup.ex index 97b8718c1..cb9a40ba1 100644 --- a/lib/pleroma/user/backup.ex +++ b/lib/pleroma/user/backup.ex @@ -9,6 +9,7 @@ defmodule Pleroma.User.Backup do import Ecto.Query import Pleroma.Web.Gettext + require Logger require Pleroma.Constants alias Pleroma.Activity @@ -130,48 +131,79 @@ def process(%__MODULE__{} = backup) do current_pid = self() - Task.Supervisor.async_nolink( - Pleroma.TaskSupervisor, - fn -> - with {:ok, zip_file} <- export(backup, current_pid), - {:ok, %{size: size}} <- File.stat(zip_file), - {:ok, _upload} <- upload(backup, zip_file) do - backup - |> cast( - %{ - file_size: size, - processed: true, - state: :complete - }, - [:file_size, :processed, :state] - ) - |> Repo.update() + task = + Task.Supervisor.async_nolink( + Pleroma.TaskSupervisor, + __MODULE__, + :do_process, + [backup, current_pid] + ) - send(current_pid, :completed) - end - end - ) - - wait_backup(backup, backup.processed_number) + wait_backup(backup, backup.processed_number, task) end - defp wait_backup(backup, current_processed) do + def do_process(backup, current_pid) do + with {:ok, zip_file} <- export(backup, current_pid), + {:ok, %{size: size}} <- File.stat(zip_file), + {:ok, _upload} <- upload(backup, zip_file) do + backup + |> cast( + %{ + file_size: size, + processed: true, + state: :complete + }, + [:file_size, :processed, :state] + ) + |> Repo.update() + end + end + + defp wait_backup(backup, current_processed, task) do receive do {:progress, new_processed} -> total_processed = current_processed + new_processed set_state(backup, :running, total_processed) - wait_backup(backup, total_processed) + wait_backup(backup, total_processed, task) - :completed -> - {:ok, get(backup.id)} + {:DOWN, _ref, _proc, _pid, reason} -> + backup = get(backup.id) + + if reason != :normal do + Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}") + + {:ok, backup} = set_state(backup, :failed) + + {:error, + %{ + backup: backup, + reason: :exit, + details: reason + }} + else + {:ok, backup} + end after - 30_000 -> set_state(backup, :failed) + 30_000 -> + Logger.error( + "Backup #{backup.id} timed out after no response for 30 seconds, terminating" + ) + + Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid) + + {:ok, backup} = set_state(backup, :failed) + + {:error, + %{ + backup: backup, + reason: :timeout + }} end end @files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json'] - def export(%__MODULE__{} = backup, caller_pid \\ nil) do + def export(%__MODULE__{} = backup, caller_pid) do backup = Repo.preload(backup, :user) name = String.trim_trailing(backup.file_name, ".zip") dir = dir(name) @@ -243,7 +275,12 @@ defp write(query, dir, name, fun, caller_pid) do query |> Pleroma.Repo.chunk_stream(100) |> Enum.reduce(0, fn i, acc -> - with {:ok, data} <- fun.(i), + with {:ok, data} <- + (try do + fun.(i) + rescue + e -> {:error, e} + end), {:ok, str} <- Jason.encode(data), :ok <- IO.write(file, str <> ",\n") do if should_report?(acc + 1) do @@ -252,7 +289,15 @@ defp write(query, dir, name, fun, caller_pid) do acc + 1 else - _ -> acc + {:error, e} -> + Logger.warn( + "Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}" + ) + + acc + + _ -> + acc end end) diff --git a/test/pleroma/user/backup_test.exs b/test/pleroma/user/backup_test.exs index a536b4a4a..066bf6ba8 100644 --- a/test/pleroma/user/backup_test.exs +++ b/test/pleroma/user/backup_test.exs @@ -270,6 +270,56 @@ test "it counts the correct number processed" do Backup.delete(backup) end + test "it handles errors" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + Enum.map(1..120, fn i -> + {:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"}) + end) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + + with_mock Pleroma.Web.ActivityPub.Transmogrifier, + [:passthrough], + prepare_outgoing: fn data -> + object = + data["object"] + |> Pleroma.Object.normalize(fetch: false) + |> Map.get(:data) + + data = data |> Map.put("object", object) + + if String.contains?(data["object"]["content"], "119"), + do: raise(%Postgrex.Error{}), + else: {:ok, data} + end do + {:ok, backup} = Backup.process(backup) + assert backup.processed + assert backup.state == :complete + assert backup.processed_number == 1 + 119 + + Backup.delete(backup) + end + end + + test "it handles unrecoverable exceptions" do + user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"}) + + assert {:ok, backup} = user |> Backup.new() |> Repo.insert() + + with_mock Backup, [:passthrough], do_process: fn _, _ -> raise "mock exception" end do + {:error, %{backup: backup, reason: :exit}} = Backup.process(backup) + + assert backup.state == :failed + end + + with_mock Backup, [:passthrough], do_process: fn _, _ -> Process.sleep(:timer.seconds(32)) end do + {:error, %{backup: backup, reason: :timeout}} = Backup.process(backup) + + assert backup.state == :failed + end + end + describe "it uploads and deletes a backup archive" do setup do clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")