Make backup parameters configurable
This commit is contained in:
parent
7d3e4eaeb9
commit
179efd9467
|
@ -874,7 +874,9 @@
|
||||||
config :pleroma, Pleroma.User.Backup,
|
config :pleroma, Pleroma.User.Backup,
|
||||||
purge_after_days: 30,
|
purge_after_days: 30,
|
||||||
limit_days: 7,
|
limit_days: 7,
|
||||||
dir: nil
|
dir: nil,
|
||||||
|
process_wait_time: 30_000,
|
||||||
|
process_chunk_size: 100
|
||||||
|
|
||||||
config :pleroma, ConcurrentLimiter, [
|
config :pleroma, ConcurrentLimiter, [
|
||||||
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},
|
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},
|
||||||
|
|
|
@ -3394,6 +3394,21 @@
|
||||||
type: :integer,
|
type: :integer,
|
||||||
description: "Limit user to export not more often than once per N days",
|
description: "Limit user to export not more often than once per N days",
|
||||||
suggestions: [7]
|
suggestions: [7]
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
key: :process_wait_time,
|
||||||
|
type: :integer,
|
||||||
|
label: "Process Wait Time",
|
||||||
|
description:
|
||||||
|
"The amount of time to wait for backup to report progress, in milliseconds. If no progress is received from the backup job for that much time, terminate it and deem it failed.",
|
||||||
|
suggestions: [30_000]
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
key: :process_chunk_size,
|
||||||
|
type: :integer,
|
||||||
|
label: "Process Chunk Size",
|
||||||
|
description: "The number of activities to fetch in the backup job for each chunk.",
|
||||||
|
suggestions: [100]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
|
@ -35,8 +35,6 @@ defmodule Pleroma.User.Backup do
|
||||||
timestamps()
|
timestamps()
|
||||||
end
|
end
|
||||||
|
|
||||||
@report_every 100
|
|
||||||
|
|
||||||
def create(user, admin_id \\ nil) do
|
def create(user, admin_id \\ nil) do
|
||||||
with :ok <- validate_limit(user, admin_id),
|
with :ok <- validate_limit(user, admin_id),
|
||||||
{:ok, backup} <- user |> new() |> Repo.insert() do
|
{:ok, backup} <- user |> new() |> Repo.insert() do
|
||||||
|
@ -160,6 +158,8 @@ def do_process(backup, current_pid) do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp wait_backup(backup, current_processed, task) do
|
defp wait_backup(backup, current_processed, task) do
|
||||||
|
wait_time = Pleroma.Config.get([__MODULE__, :process_wait_time])
|
||||||
|
|
||||||
receive do
|
receive do
|
||||||
{:progress, new_processed} ->
|
{:progress, new_processed} ->
|
||||||
total_processed = current_processed + new_processed
|
total_processed = current_processed + new_processed
|
||||||
|
@ -175,6 +175,8 @@ defp wait_backup(backup, current_processed, task) do
|
||||||
|
|
||||||
{:ok, backup} = set_state(backup, :failed)
|
{:ok, backup} = set_state(backup, :failed)
|
||||||
|
|
||||||
|
cleanup(backup)
|
||||||
|
|
||||||
{:error,
|
{:error,
|
||||||
%{
|
%{
|
||||||
backup: backup,
|
backup: backup,
|
||||||
|
@ -185,15 +187,17 @@ defp wait_backup(backup, current_processed, task) do
|
||||||
{:ok, backup}
|
{:ok, backup}
|
||||||
end
|
end
|
||||||
after
|
after
|
||||||
30_000 ->
|
wait_time ->
|
||||||
Logger.error(
|
Logger.error(
|
||||||
"Backup #{backup.id} timed out after no response for 30 seconds, terminating"
|
"Backup #{backup.id} timed out after no response for #{wait_time}ms, terminating"
|
||||||
)
|
)
|
||||||
|
|
||||||
Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
|
Task.Supervisor.terminate_child(Pleroma.TaskSupervisor, task.pid)
|
||||||
|
|
||||||
{:ok, backup} = set_state(backup, :failed)
|
{:ok, backup} = set_state(backup, :failed)
|
||||||
|
|
||||||
|
cleanup(backup)
|
||||||
|
|
||||||
{:error,
|
{:error,
|
||||||
%{
|
%{
|
||||||
backup: backup,
|
backup: backup,
|
||||||
|
@ -205,8 +209,7 @@ defp wait_backup(backup, current_processed, task) do
|
||||||
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
@files ['actor.json', 'outbox.json', 'likes.json', 'bookmarks.json']
|
||||||
def export(%__MODULE__{} = backup, caller_pid) do
|
def export(%__MODULE__{} = backup, caller_pid) do
|
||||||
backup = Repo.preload(backup, :user)
|
backup = Repo.preload(backup, :user)
|
||||||
name = String.trim_trailing(backup.file_name, ".zip")
|
dir = backup_tempdir(backup)
|
||||||
dir = dir(name)
|
|
||||||
|
|
||||||
with :ok <- File.mkdir(dir),
|
with :ok <- File.mkdir(dir),
|
||||||
:ok <- actor(dir, backup.user, caller_pid),
|
:ok <- actor(dir, backup.user, caller_pid),
|
||||||
|
@ -264,16 +267,28 @@ defp write_header(file, name) do
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp should_report?(num), do: rem(num, @report_every) == 0
|
defp should_report?(num, chunk_size), do: rem(num, chunk_size) == 0
|
||||||
|
|
||||||
|
defp backup_tempdir(backup) do
|
||||||
|
name = String.trim_trailing(backup.file_name, ".zip")
|
||||||
|
dir(name)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp cleanup(backup) do
|
||||||
|
dir = backup_tempdir(backup)
|
||||||
|
File.rm_rf(dir)
|
||||||
|
end
|
||||||
|
|
||||||
defp write(query, dir, name, fun, caller_pid) do
|
defp write(query, dir, name, fun, caller_pid) do
|
||||||
path = Path.join(dir, "#{name}.json")
|
path = Path.join(dir, "#{name}.json")
|
||||||
|
|
||||||
|
chunk_size = Pleroma.Config.get([__MODULE__, :process_chunk_size])
|
||||||
|
|
||||||
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
with {:ok, file} <- File.open(path, [:write, :utf8]),
|
||||||
:ok <- write_header(file, name) do
|
:ok <- write_header(file, name) do
|
||||||
total =
|
total =
|
||||||
query
|
query
|
||||||
|> Pleroma.Repo.chunk_stream(100)
|
|> Pleroma.Repo.chunk_stream(chunk_size, _returns_as = :one, timeout: :infinity)
|
||||||
|> Enum.reduce(0, fn i, acc ->
|
|> Enum.reduce(0, fn i, acc ->
|
||||||
with {:ok, data} <-
|
with {:ok, data} <-
|
||||||
(try do
|
(try do
|
||||||
|
@ -283,8 +298,8 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
end),
|
end),
|
||||||
{:ok, str} <- Jason.encode(data),
|
{:ok, str} <- Jason.encode(data),
|
||||||
:ok <- IO.write(file, str <> ",\n") do
|
:ok <- IO.write(file, str <> ",\n") do
|
||||||
if should_report?(acc + 1) do
|
if should_report?(acc + 1, chunk_size) do
|
||||||
send(caller_pid, {:progress, @report_every})
|
send(caller_pid, {:progress, chunk_size})
|
||||||
end
|
end
|
||||||
|
|
||||||
acc + 1
|
acc + 1
|
||||||
|
@ -301,7 +316,7 @@ defp write(query, dir, name, fun, caller_pid) do
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
send(caller_pid, {:progress, rem(total, @report_every)})
|
send(caller_pid, {:progress, rem(total, chunk_size)})
|
||||||
|
|
||||||
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
with :ok <- :file.pwrite(file, {:eof, -2}, "\n],\n \"totalItems\": #{total}}") do
|
||||||
File.close(file)
|
File.close(file)
|
||||||
|
|
Loading…
Reference in New Issue