Log errors more extensively
This commit is contained in:
parent
070fbb89e1
commit
7d3e4eaeb9
|
@ -9,6 +9,7 @@ defmodule Pleroma.User.Backup do
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
import Pleroma.Web.Gettext
|
import Pleroma.Web.Gettext
|
||||||
|
|
||||||
|
require Logger
|
||||||
require Pleroma.Constants
|
require Pleroma.Constants
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
|
@ -130,48 +131,79 @@ def process(%__MODULE__{} = backup) do
|
||||||
|
|
||||||
current_pid = self()
|
current_pid = self()
|
||||||
|
|
||||||
Task.Supervisor.async_nolink(
|
task =
|
||||||
Pleroma.TaskSupervisor,
|
Task.Supervisor.async_nolink(
|
||||||
fn ->
|
Pleroma.TaskSupervisor,
|
||||||
with {:ok, zip_file} <- export(backup, current_pid),
|
__MODULE__,
|
||||||
{:ok, %{size: size}} <- File.stat(zip_file),
|
:do_process,
|
||||||
{:ok, _upload} <- upload(backup, zip_file) do
|
[backup, current_pid]
|
||||||
backup
|
)
|
||||||
|> cast(
|
|
||||||
%{
|
|
||||||
file_size: size,
|
|
||||||
processed: true,
|
|
||||||
state: :complete
|
|
||||||
},
|
|
||||||
[:file_size, :processed, :state]
|
|
||||||
)
|
|
||||||
|> Repo.update()
|
|
||||||
|
|
||||||
send(current_pid, :completed)
|
wait_backup(backup, backup.processed_number, task)
|
||||||
end
|
|
||||||
end
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_backup(backup, backup.processed_number)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp wait_backup(backup, current_processed) do
|
def do_process(backup, current_pid) do
|
||||||
|
with {:ok, zip_file} <- export(backup, current_pid),
|
||||||
|
{:ok, %{size: size}} <- File.stat(zip_file),
|
||||||
|
{:ok, _upload} <- upload(backup, zip_file) do
|
||||||
|
backup
|
||||||
|
|> cast(
|
||||||
|
%{
|
||||||
|
file_size: size,
|
||||||
|
processed: true,
|
||||||
|
state: :complete
|
||||||
|
},
|
||||||
|
[:file_size, :processed, :state]
|
||||||
|
)
|
||||||
|
|> Repo.update()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp wait_backup(backup, current_processed, task) do
|
||||||
receive do
|
receive do
|
||||||
{:progress, new_processed} ->
|
{:progress, new_processed} ->
|
||||||
total_processed = current_processed + new_processed
|
total_processed = current_processed + new_processed
|
||||||
|
|
||||||
set_state(backup, :running, total_processed)
|
set_state(backup, :running, total_processed)
|
||||||
wait_backup(backup, total_processed)
|
wait_backup(backup, total_processed, task)
|
||||||
|
|
||||||
:completed ->
|
{:DOWN, _ref, _proc, _pid, reason} ->
|
||||||
{:ok, get(backup.id)}
|
backup = get(backup.id)
|
||||||
|
|
||||||
|
if reason != :normal do
|
||||||
|
Logger.error("Backup #{backup.id} process ended abnormally: #{inspect(reason)}")
|
||||||
|
|
||||||
|
{:ok, backup} = set_state(backup, :failed)
|
||||||
|
|
||||||
|
{:error,
|
||||||
|
%{
|
||||||
|
backup: backup,
|
||||||
|
reason: :exit,
|
||||||
|
details: reason
|
||||||
|
}}
|
||||||
|
else
|
||||||
|
{:ok, backup}
|
||||||
|
end
|
||||||
after
|
after
|
||||||
30_000 -> set_state(backup, :failed)
|
30_000 ->
|
||||||
|
Logger.error(
|
||||||
|
"Backup #{backup.id} timed out after no response for 30 seconds, terminating"
|
||||||
|
)
|
||||||
|
|
||||||
|
Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
|
||||||
|
|
||||||
|
{:ok, backup} = set_state(backup, :failed)
|
||||||
|
|
||||||
|
{:error,
|
||||||
|
%{
|
||||||
|
backup: backup,
|
||||||
|
reason: :timeout
|
||||||
|
}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
||||||
def export(%__MODULE__{} = backup, caller_pid \\ nil) do
|
def export(%__MODULE__{} = backup, caller_pid) do
|
||||||
backup = Repo.preload(backup, :user)
|
backup = Repo.preload(backup, :user)
|
||||||
name = String.trim_trailing(backup.file_name, ".zip")
|
name = String.trim_trailing(backup.file_name, ".zip")
|
||||||
dir = dir(name)
|
dir = dir(name)
|
||||||
|
@ -243,7 +275,12 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
query
|
query
|
||||||
|> Pleroma.Repo.chunk_stream(100)
|
|> Pleroma.Repo.chunk_stream(100)
|
||||||
|> Enum.reduce(0, fn i, acc ->
|
|> Enum.reduce(0, fn i, acc ->
|
||||||
with {:ok, data} <- fun.(i),
|
with {:ok, data} <-
|
||||||
|
(try do
|
||||||
|
fun.(i)
|
||||||
|
rescue
|
||||||
|
e -> {:error, e}
|
||||||
|
end),
|
||||||
{:ok, str} <- Jason.encode(data),
|
{:ok, str} <- Jason.encode(data),
|
||||||
:ok <- IO.write(file, str <> ",\n") do
|
:ok <- IO.write(file, str <> ",\n") do
|
||||||
if should_report?(acc + 1) do
|
if should_report?(acc + 1) do
|
||||||
|
@ -252,7 +289,15 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
|
|
||||||
acc + 1
|
acc + 1
|
||||||
else
|
else
|
||||||
_ -> acc
|
{:error, e} ->
|
||||||
|
Logger.warn(
|
||||||
|
"Error processing backup item: #{inspect(e)}\n The item is: #{inspect(i)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
acc
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
acc
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
|
|
@ -270,6 +270,56 @@ test "it counts the correct number processed" do
|
||||||
Backup.delete(backup)
|
Backup.delete(backup)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "it handles errors" do
|
||||||
|
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
|
||||||
|
|
||||||
|
Enum.map(1..120, fn i ->
|
||||||
|
{:ok, _status} = CommonAPI.post(user, %{status: "status #{i}"})
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
||||||
|
|
||||||
|
with_mock Pleroma.Web.ActivityPub.Transmogrifier,
|
||||||
|
[:passthrough],
|
||||||
|
prepare_outgoing: fn data ->
|
||||||
|
object =
|
||||||
|
data["object"]
|
||||||
|
|> Pleroma.Object.normalize(fetch: false)
|
||||||
|
|> Map.get(:data)
|
||||||
|
|
||||||
|
data = data |> Map.put("object", object)
|
||||||
|
|
||||||
|
if String.contains?(data["object"]["content"], "119"),
|
||||||
|
do: raise(%Postgrex.Error{}),
|
||||||
|
else: {:ok, data}
|
||||||
|
end do
|
||||||
|
{:ok, backup} = Backup.process(backup)
|
||||||
|
assert backup.processed
|
||||||
|
assert backup.state == :complete
|
||||||
|
assert backup.processed_number == 1 + 119
|
||||||
|
|
||||||
|
Backup.delete(backup)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it handles unrecoverable exceptions" do
|
||||||
|
user = insert(:user, %{nickname: "cofe", name: "Cofe", ap_id: "http://cofe.io/users/cofe"})
|
||||||
|
|
||||||
|
assert {:ok, backup} = user |> Backup.new() |> Repo.insert()
|
||||||
|
|
||||||
|
with_mock Backup, [:passthrough], do_process: fn _, _ -> raise "mock exception" end do
|
||||||
|
{:error, %{backup: backup, reason: :exit}} = Backup.process(backup)
|
||||||
|
|
||||||
|
assert backup.state == :failed
|
||||||
|
end
|
||||||
|
|
||||||
|
with_mock Backup, [:passthrough], do_process: fn _, _ -> Process.sleep(:timer.seconds(32)) end do
|
||||||
|
{:error, %{backup: backup, reason: :timeout}} = Backup.process(backup)
|
||||||
|
|
||||||
|
assert backup.state == :failed
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "it uploads and deletes a backup archive" do
|
describe "it uploads and deletes a backup archive" do
|
||||||
setup do
|
setup do
|
||||||
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
|
clear_config([Pleroma.Upload, :base_url], "https://s3.amazonaws.com")
|
||||||
|
|
Loading…
Reference in New Issue