Merge branch 'configurable-federator' into 'develop'

Make Federator options configurable.

See merge request pleroma/pleroma!615
This commit is contained in:
rinpatch 2019-01-01 18:55:59 +00:00
commit 34fa709015
4 changed files with 27 additions and 14 deletions

View File

@ -252,6 +252,14 @@
"internal" "internal"
] ]
config :pleroma, Pleroma.Web.Federator, max_jobs: 50
config :pleroma, Pleroma.Web.Federator.RetryQueue,
enabled: false,
max_jobs: 20,
initial_timeout: 30,
max_retries: 5
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

View File

@ -193,3 +193,14 @@ You can then do
``` ```
curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
``` ```
## Pleroma.Web.Federator
* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
## Pleroma.Web.Federator.RetryQueue
* `enabled`: If set to `true`, failed federation jobs will be retried
* `max_jobs`: The maximum amount of parallel federation jbos running at the same time.
* `initial_timeout`: The initial timeout in seconds
* `max_retries`: The maximum number of times a federation job is retried

View File

@ -17,7 +17,6 @@ defmodule Pleroma.Web.Federator do
@websub Application.get_env(:pleroma, :websub) @websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :ostatus) @ostatus Application.get_env(:pleroma, :ostatus)
@max_jobs 20
def init(args) do def init(args) do
{:ok, args} {:ok, args}
@ -168,7 +167,7 @@ def enqueue(type, payload, priority \\ 1) do
end end
def maybe_start_job(running_jobs, queue) do def maybe_start_job(running_jobs, queue) do
if :sets.size(running_jobs) < @max_jobs && queue != [] do if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do
{{type, payload}, queue} = queue_pop(queue) {{type, payload}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end) {:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid) mref = Process.monitor(pid)

View File

@ -7,12 +7,6 @@ defmodule Pleroma.Web.Federator.RetryQueue do
require Logger require Logger
# seconds
@initial_timeout 30
@max_retries 5
@max_jobs 20
def init(args) do def init(args) do
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
@ -21,7 +15,7 @@ def init(args) do
def start_link() do def start_link() do
enabled = enabled =
if Mix.env() == :test, do: true, else: Pleroma.Config.get([:retry_queue, :enabled], false) if Mix.env() == :test, do: true, else: Pleroma.Config.get([__MODULE__, :enabled], false)
if enabled do if enabled do
Logger.info("Starting retry queue") Logger.info("Starting retry queue")
@ -54,7 +48,7 @@ def reset_stats() do
end end
def get_retry_params(retries) do def get_retry_params(retries) do
if retries > @max_retries do if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
{:drop, "Max retries reached"} {:drop, "Max retries reached"}
else else
{:retry, growth_function(retries)} {:retry, growth_function(retries)}
@ -108,12 +102,12 @@ def maybe_start_job(running_jobs, queue_table) do
current_time = DateTime.to_unix(DateTime.utc_now()) current_time = DateTime.to_unix(DateTime.utc_now())
n_running_jobs = :sets.size(running_jobs) n_running_jobs = :sets.size(running_jobs)
if n_running_jobs < @max_jobs do if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
n_ready_jobs = ets_count_expires(queue_table, current_time) n_ready_jobs = ets_count_expires(queue_table, current_time)
if n_ready_jobs > 0 do if n_ready_jobs > 0 do
# figure out how many we could start # figure out how many we could start
available_job_slots = @max_jobs - n_running_jobs available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
else else
running_jobs running_jobs
@ -228,12 +222,13 @@ def handle_info(unknown, state) do
if Mix.env() == :test do if Mix.env() == :test do
defp growth_function(_retries) do defp growth_function(_retries) do
_shutit = @initial_timeout _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
DateTime.to_unix(DateTime.utc_now()) - 1 DateTime.to_unix(DateTime.utc_now()) - 1
end end
else else
defp growth_function(retries) do defp growth_function(retries) do
round(@initial_timeout * :math.pow(retries, 3)) + DateTime.to_unix(DateTime.utc_now()) round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
DateTime.to_unix(DateTime.utc_now())
end end
end end