[#3213] HashtagsTableMigrator: fault rate allowance to enable the feature (defaults to 1%), counting of affected objects, misc. tweaks.

This commit is contained in:
Ivan Tashkinov 2021-02-18 20:40:10 +03:00
parent 854ea1aefb
commit b981edad8a
5 changed files with 84 additions and 31 deletions

View File

@ -657,6 +657,8 @@
config :pleroma, :database, rum_enabled: false config :pleroma, :database, rum_enabled: false
config :pleroma, :populate_hashtags_table, fault_rate_allowance: 0.01
config :pleroma, :env, Mix.env() config :pleroma, :env, Mix.env()
config :http_signatures, config :http_signatures,

View File

@ -479,6 +479,13 @@
type: :group, type: :group,
description: "`populate_hashtags_table` background migration settings", description: "`populate_hashtags_table` background migration settings",
children: [ children: [
%{
key: :fault_rate_allowance,
type: :float,
description:
"Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).",
suggestions: [0.01]
},
%{ %{
key: :sleep_interval_ms, key: :sleep_interval_ms,
type: :integer, type: :integer,

View File

@ -70,6 +70,7 @@ To add configuration to your config file, you can copy it from the base config.
## Background migrations ## Background migrations
* `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances). * `populate_hashtags_table/sleep_interval_ms`: Sleep interval between each chunk of processed records in order to decrease the load on the system (defaults to 0 and should be keep default on most instances).
* `populate_hashtags_table/fault_rate_allowance`: Max rate of failed objects to actually processed objects in order to enable the feature (any value from 0.0 which tolerates no errors to 1.0 which will enable the feature even if hashtags transfer failed for all records).
## Welcome ## Welcome
* `direct_message`: - welcome message sent as a direct message. * `direct_message`: - welcome message sent as a direct message.

View File

@ -15,7 +15,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
defdelegate data_migration(), to: State defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table
defdelegate data_migration_id(), to: State
defdelegate state(), to: State defdelegate state(), to: State
defdelegate persist_state(), to: State, as: :persist_to_db defdelegate persist_state(), to: State, as: :persist_to_db
@ -23,10 +24,13 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate put_stat(key, value), to: State, as: :put_data_key
defdelegate increment_stat(key, increment), to: State, as: :increment_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
@feature_config_path [:database, :improved_hashtag_timeline]
@reg_name {:global, __MODULE__} @reg_name {:global, __MODULE__}
def whereis, do: GenServer.whereis(@reg_name) def whereis, do: GenServer.whereis(@reg_name)
def feature_state, do: Config.get(@feature_config_path)
def start_link(_) do def start_link(_) do
case whereis() do case whereis() do
nil -> nil ->
@ -46,8 +50,6 @@ def init(_) do
def handle_continue(:init_state, _state) do def handle_continue(:init_state, _state) do
{:ok, _} = State.start_link(nil) {:ok, _} = State.start_link(nil)
update_status(:pending)
data_migration = data_migration() data_migration = data_migration()
manual_migrations = Config.get([:instance, :manual_data_migrations], []) manual_migrations = Config.get([:instance, :manual_data_migrations], [])
@ -56,10 +58,14 @@ def handle_continue(:init_state, _state) do
update_status(:noop) update_status(:noop)
is_nil(data_migration) -> is_nil(data_migration) ->
update_status(:failed, "Data migration does not exist.") message = "Data migration does not exist."
update_status(:failed, message)
Logger.error("#{__MODULE__}: #{message}")
data_migration.state == :manual or data_migration.name in manual_migrations -> data_migration.state == :manual or data_migration.name in manual_migrations ->
update_status(:manual, "Data migration is in manual execution state.") message = "Data migration is in manual execution or manual fix mode."
update_status(:manual, message)
Logger.warn("#{__MODULE__}: #{message}")
data_migration.state == :complete -> data_migration.state == :complete ->
on_complete(data_migration) on_complete(data_migration)
@ -78,7 +84,7 @@ def handle_info(:migrate_hashtags, state) do
update_status(:running) update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now()) put_stat(:started_at, NaiveDateTime.utc_now())
%{id: data_migration_id} = data_migration() data_migration_id = data_migration_id()
max_processed_id = get_stat(:max_processed_id, 0) max_processed_id = get_stat(:max_processed_id, 0)
Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...") Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
@ -89,12 +95,19 @@ def handle_info(:migrate_hashtags, state) do
|> Stream.each(fn objects -> |> Stream.each(fn objects ->
object_ids = Enum.map(objects, & &1.id) object_ids = Enum.map(objects, & &1.id)
results = Enum.map(objects, &transfer_object_hashtags(&1))
failed_ids = failed_ids =
objects results
|> Enum.map(&transfer_object_hashtags(&1))
|> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1)) |> Enum.map(&elem(&1, 1))
# Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags)
chunk_affected_count =
results
|> Enum.filter(&(elem(&1, 0) == :ok))
|> length()
for failed_id <- failed_ids do for failed_id <- failed_ids do
_ = _ =
Repo.query( Repo.query(
@ -116,6 +129,7 @@ def handle_info(:migrate_hashtags, state) do
put_stat(:max_processed_id, max_object_id) put_stat(:max_processed_id, max_object_id)
increment_stat(:processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids)) increment_stat(:failed_count, length(failed_ids))
increment_stat(:affected_count, chunk_affected_count)
put_stat(:records_per_second, records_per_second()) put_stat(:records_per_second, records_per_second())
persist_state() persist_state()
@ -125,17 +139,42 @@ def handle_info(:migrate_hashtags, state) do
end) end)
|> Stream.run() |> Stream.run()
with 0 <- failures_count(data_migration_id) do fault_rate = fault_rate()
_ = delete_non_create_activities_hashtags() put_stat(:fault_rate, fault_rate)
fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0)
cond do
fault_rate == 0 ->
set_complete() set_complete()
else
_ -> is_float(fault_rate) and fault_rate <= fault_rate_allowance ->
update_status(:failed, "Please check data_migration_failed_ids records.") message = """
Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}.
Putting data migration to manual fix mode. Check `retry_failed/0`.
"""
Logger.warn("#{__MODULE__}: #{message}")
update_status(:manual, message)
on_complete(data_migration())
true ->
message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`."
Logger.error("#{__MODULE__}: #{message}")
update_status(:failed, message)
end end
persist_state()
{:noreply, state} {:noreply, state}
end end
def fault_rate do
with failures_count when is_integer(failures_count) <- failures_count() do
failures_count / Enum.max([get_stat(:affected_count, 0), 1])
else
_ -> :error
end
end
defp records_per_second do defp records_per_second do
get_stat(:processed_count, 0) / Enum.max([running_time(), 1]) get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
end end
@ -194,6 +233,7 @@ defp query do
|> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id)) |> where([_o, hashtags_objects], is_nil(hashtags_objects.object_id))
end end
@spec transfer_object_hashtags(Map.t()) :: {:noop | :ok | :error, integer()}
defp transfer_object_hashtags(object) do defp transfer_object_hashtags(object) do
embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"] embedded_tags = if Map.has_key?(object, :tag), do: object.tag, else: object.data["tag"]
hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags}) hashtags = Object.object_data_hashtags(%{"tag" => embedded_tags})
@ -201,7 +241,7 @@ defp transfer_object_hashtags(object) do
if Enum.any?(hashtags) do if Enum.any?(hashtags) do
transfer_object_hashtags(object, hashtags) transfer_object_hashtags(object, hashtags)
else else
{:ok, object.id} {:noop, object.id}
end end
end end
@ -209,13 +249,11 @@ defp transfer_object_hashtags(object, hashtags) do
Repo.transaction(fn -> Repo.transaction(fn ->
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id}) maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
expected_rows = length(hashtag_records) base_error = "ERROR when inserting hashtags_objects for object with id #{object.id}"
base_error =
"ERROR when inserting #{expected_rows} hashtags_objects for obj. #{object.id}"
try do try do
with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do with {rows_count, _} when is_integer(rows_count) <-
Repo.insert_all("hashtags_objects", maps, on_conflict: :nothing) do
object.id object.id
else else
e -> e ->
@ -260,11 +298,11 @@ defp on_complete(data_migration) do
data_migration.feature_lock -> data_migration.feature_lock ->
:noop :noop
not is_nil(Config.get([:database, :improved_hashtag_timeline])) -> not is_nil(feature_state()) ->
:noop :noop
true -> true ->
Config.put([:database, :improved_hashtag_timeline], true) Config.put(@feature_config_path, true)
:ok :ok
end end
end end
@ -274,38 +312,41 @@ def failed_objects_query do
|> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
on: dmf.record_id == o.id on: dmf.record_id == o.id
) )
|> where([_o, dmf], dmf.data_migration_id == ^data_migration().id) |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id())
|> order_by([o], asc: o.id) |> order_by([o], asc: o.id)
end end
def failures_count(data_migration_id \\ nil) do def failures_count do
data_migration_id = data_migration_id || data_migration().id
with {:ok, %{rows: [[count]]}} <- with {:ok, %{rows: [[count]]}} <-
Repo.query( Repo.query(
"SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;",
[data_migration_id] [data_migration_id()]
) do ) do
count count
end end
end end
def retry_failed do def retry_failed do
data_migration = data_migration() data_migration_id = data_migration_id()
failed_objects_query() failed_objects_query()
|> Repo.chunk_stream(100, :one) |> Repo.chunk_stream(100, :one)
|> Stream.each(fn object -> |> Stream.each(fn object ->
with {:ok, _} <- transfer_object_hashtags(object) do with {res, _} when res != :error <- transfer_object_hashtags(object) do
_ = _ =
Repo.query( Repo.query(
"DELETE FROM data_migration_failed_ids " <> "DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = $2", "WHERE data_migration_id = $1 AND record_id = $2",
[data_migration.id, object.id] [data_migration_id, object.id]
) )
end end
end) end)
|> Stream.run() |> Stream.run()
put_stat(:failed_count, failures_count())
persist_state()
force_continue()
end end
def force_continue do def force_continue do

View File

@ -7,7 +7,7 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
alias Pleroma.DataMigration alias Pleroma.DataMigration
defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator
@reg_name {:global, __MODULE__} @reg_name {:global, __MODULE__}
@ -99,4 +99,6 @@ defp persist_non_data_change(:state, value) do
defp persist_non_data_change(_, _) do defp persist_non_data_change(_, _) do
nil nil
end end
def data_migration_id, do: Map.get(state(), :data_migration_id)
end end