From 3edf45021eb6c3fba06bc083b346f7db54cd073f Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Fri, 12 Mar 2021 12:18:11 +0300 Subject: [PATCH] [#3213] Background migration infrastructure refactoring. Extracted BaseMigrator and BaseMigratorState. --- lib/pleroma/application.ex | 11 +- .../migrators/hashtags_table_migrator.ex | 265 ++++-------------- .../hashtags_table_migrator/state.ex | 104 ------- .../migrators/support/base_migrator.ex | 210 ++++++++++++++ .../migrators/support/base_migrator_state.ex | 116 ++++++++ 5 files changed, 385 insertions(+), 321 deletions(-) delete mode 100644 lib/pleroma/migrators/hashtags_table_migrator/state.ex create mode 100644 lib/pleroma/migrators/support/base_migrator.ex create mode 100644 lib/pleroma/migrators/support/base_migrator_state.ex diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 2ff7562e2..06d399b2e 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -103,10 +103,7 @@ def start(_type, _args) do task_children(@mix_env) ++ dont_run_in_test(@mix_env) ++ chat_child(chat_enabled?()) ++ - [ - Pleroma.Migrators.HashtagsTableMigrator, - Pleroma.Gopher.Server - ] + [Pleroma.Gopher.Server] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options @@ -231,6 +228,12 @@ defp dont_run_in_test(_) do keys: :duplicate, partitions: System.schedulers_online() ]} + ] ++ background_migrators() + end + + defp background_migrators do + [ + Pleroma.Migrators.HashtagsTableMigrator ] end diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 6123c88e0..b84058e11 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -3,88 +3,27 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Migrators.HashtagsTableMigrator do - use GenServer + defmodule State do + use Pleroma.Migrators.Support.BaseMigratorState - require Logger + @impl Pleroma.Migrators.Support.BaseMigratorState + defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table + end - import Ecto.Query + use Pleroma.Migrators.Support.BaseMigrator - alias __MODULE__.State - alias Pleroma.Config alias Pleroma.Hashtag + alias Pleroma.Migrators.Support.BaseMigrator alias Pleroma.Object - alias Pleroma.Repo - defdelegate data_migration(), to: Pleroma.DataMigration, as: :populate_hashtags_table - defdelegate data_migration_id(), to: State + @impl BaseMigrator + def feature_config_path, do: [:features, :improved_hashtag_timeline] - defdelegate state(), to: State - defdelegate persist_state(), to: State, as: :persist_to_db - defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key - defdelegate put_stat(key, value), to: State, as: :put_data_key - defdelegate increment_stat(key, increment), to: State, as: :increment_data_key - - @feature_config_path [:features, :improved_hashtag_timeline] - @reg_name {:global, __MODULE__} - - def whereis, do: GenServer.whereis(@reg_name) - - def feature_state, do: Config.get(@feature_config_path) - - def start_link(_) do - case whereis() do - nil -> - GenServer.start_link(__MODULE__, nil, name: @reg_name) - - pid -> - {:ok, pid} - end - end - - @impl true - def init(_) do - {:ok, nil, {:continue, :init_state}} - end - - @impl true - def handle_continue(:init_state, _state) do - {:ok, _} = State.start_link(nil) - - data_migration = data_migration() - manual_migrations = Config.get([:instance, :manual_data_migrations], []) - - cond do - Config.get(:env) == :test -> - update_status(:noop) - - is_nil(data_migration) -> - message = "Data migration does not exist." - update_status(:failed, message) - Logger.error("#{__MODULE__}: #{message}") - - data_migration.state == :manual or data_migration.name in manual_migrations -> - message = "Data migration is in manual execution or manual fix mode." - update_status(:manual, message) - Logger.warn("#{__MODULE__}: #{message}") - - data_migration.state == :complete -> - on_complete(data_migration) - - true -> - send(self(), :migrate_hashtags) - end - - {:noreply, nil} - end - - @impl true - def handle_info(:migrate_hashtags, state) do - State.reinit() - - update_status(:running) - put_stat(:iteration_processed_count, 0) - put_stat(:started_at, NaiveDateTime.utc_now()) + @impl BaseMigrator + def fault_rate_allowance, do: Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) + @impl BaseMigrator + def perform do data_migration_id = data_migration_id() max_processed_id = get_stat(:max_processed_id, 0) @@ -103,7 +42,7 @@ def handle_info(:migrate_hashtags, state) do |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) - # Count of objects with hashtags (`{:noop, id}` is returned for objects having other AS2 tags) + # Count of objects with hashtags: `{:noop, id}` is returned for objects having other AS2 tags chunk_affected_count = results |> Enum.filter(&(elem(&1, 0) == :ok)) @@ -140,84 +79,10 @@ def handle_info(:migrate_hashtags, state) do Process.sleep(sleep_interval) end) |> Stream.run() - - fault_rate = fault_rate() - put_stat(:fault_rate, fault_rate) - fault_rate_allowance = Config.get([:populate_hashtags_table, :fault_rate_allowance], 0) - - cond do - fault_rate == 0 -> - set_complete() - - is_float(fault_rate) and fault_rate <= fault_rate_allowance -> - message = """ - Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. - Putting data migration to manual fix mode. Check `retry_failed/0`. - """ - - Logger.warn("#{__MODULE__}: #{message}") - update_status(:manual, message) - on_complete(data_migration()) - - true -> - message = "Too many failures. Check data_migration_failed_ids records / `retry_failed/0`." - Logger.error("#{__MODULE__}: #{message}") - update_status(:failed, message) - end - - persist_state() - {:noreply, state} end - def fault_rate do - with failures_count when is_integer(failures_count) <- failures_count() do - failures_count / Enum.max([get_stat(:affected_count, 0), 1]) - else - _ -> :error - end - end - - defp records_per_second do - get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) - end - - defp running_time do - NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now())) - end - - @hashtags_objects_cleanup_query """ - DELETE FROM hashtags_objects WHERE object_id IN - (SELECT DISTINCT objects.id FROM objects - JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities - ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = - (objects.data->>'id') - AND activities.data->>'type' = 'Create' - WHERE activities.id IS NULL); - """ - - @hashtags_cleanup_query """ - DELETE FROM hashtags WHERE id IN - (SELECT hashtags.id FROM hashtags - LEFT OUTER JOIN hashtags_objects - ON hashtags_objects.hashtag_id = hashtags.id - WHERE hashtags_objects.hashtag_id IS NULL); - """ - - @doc """ - Deletes `hashtags_objects` for legacy objects not asoociated with Create activity. - Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). - """ - def delete_non_create_activities_hashtags do - {:ok, %{num_rows: hashtags_objects_count}} = - Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) - - {:ok, %{num_rows: hashtags_count}} = - Repo.query(@hashtags_cleanup_query, [], timeout: :infinity) - - {:ok, hashtags_objects_count, hashtags_count} - end - - defp query do + @impl BaseMigrator + def query do # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) # Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up from( @@ -276,54 +141,7 @@ defp transfer_object_hashtags(object, hashtags) do end) end - @doc "Approximate count for current iteration (including processed records count)" - def count(force \\ false, timeout \\ :infinity) do - stored_count = get_stat(:count) - - if stored_count && !force do - stored_count - else - processed_count = get_stat(:processed_count, 0) - max_processed_id = get_stat(:max_processed_id, 0) - query = where(query(), [object], object.id > ^max_processed_id) - - count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count - put_stat(:count, count) - persist_state() - - count - end - end - - defp on_complete(data_migration) do - if data_migration.feature_lock || feature_state() == :disabled do - Logger.warn("#{__MODULE__}: migration complete but feature is locked; consider enabling.") - :noop - else - Config.put(@feature_config_path, :enabled) - :ok - end - end - - def failed_objects_query do - from(o in Object) - |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), - on: dmf.record_id == o.id - ) - |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) - |> order_by([o], asc: o.id) - end - - def failures_count do - with {:ok, %{rows: [[count]]}} <- - Repo.query( - "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", - [data_migration_id()] - ) do - count - end - end - + @impl BaseMigrator def retry_failed do data_migration_id = data_migration_id() @@ -347,23 +165,44 @@ def retry_failed do force_continue() end - def force_continue do - send(whereis(), :migrate_hashtags) + defp failed_objects_query do + from(o in Object) + |> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"), + on: dmf.record_id == o.id + ) + |> where([_o, dmf], dmf.data_migration_id == ^data_migration_id()) + |> order_by([o], asc: o.id) end - def force_restart do - :ok = State.reset() - force_continue() - end + @doc """ + Service func to delete `hashtags_objects` for legacy objects not associated with Create activity. + Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`). + """ + def delete_non_create_activities_hashtags do + hashtags_objects_cleanup_query = """ + DELETE FROM hashtags_objects WHERE object_id IN + (SELECT DISTINCT objects.id FROM objects + JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities + ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') = + (objects.data->>'id') + AND activities.data->>'type' = 'Create' + WHERE activities.id IS NULL); + """ - def set_complete do - update_status(:complete) - persist_state() - on_complete(data_migration()) - end + hashtags_cleanup_query = """ + DELETE FROM hashtags WHERE id IN + (SELECT hashtags.id FROM hashtags + LEFT OUTER JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id + WHERE hashtags_objects.hashtag_id IS NULL); + """ - defp update_status(status, message \\ nil) do - put_stat(:state, status) - put_stat(:message, message) + {:ok, %{num_rows: hashtags_objects_count}} = + Repo.query(hashtags_objects_cleanup_query, [], timeout: :infinity) + + {:ok, %{num_rows: hashtags_count}} = + Repo.query(hashtags_cleanup_query, [], timeout: :infinity) + + {:ok, hashtags_objects_count, hashtags_count} end end diff --git a/lib/pleroma/migrators/hashtags_table_migrator/state.ex b/lib/pleroma/migrators/hashtags_table_migrator/state.ex deleted file mode 100644 index ee0009b2e..000000000 --- a/lib/pleroma/migrators/hashtags_table_migrator/state.ex +++ /dev/null @@ -1,104 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Migrators.HashtagsTableMigrator.State do - use Agent - - alias Pleroma.DataMigration - - defdelegate data_migration(), to: Pleroma.Migrators.HashtagsTableMigrator - - @reg_name {:global, __MODULE__} - - def start_link(_) do - Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) - end - - defp load_state_from_db do - data_migration = data_migration() - - data = - if data_migration do - Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) - else - %{} - end - - %{ - data_migration_id: data_migration && data_migration.id, - data: data - } - end - - def persist_to_db do - %{data_migration_id: data_migration_id, data: data} = state() - - if data_migration_id do - DataMigration.update_one_by_id(data_migration_id, data: data) - else - {:error, :nil_data_migration_id} - end - end - - def reset do - %{data_migration_id: data_migration_id} = state() - - with false <- is_nil(data_migration_id), - :ok <- - DataMigration.update_one_by_id(data_migration_id, - state: :pending, - data: %{} - ) do - reinit() - else - true -> {:error, :nil_data_migration_id} - e -> e - end - end - - def reinit do - Agent.update(@reg_name, fn _state -> load_state_from_db() end) - end - - def state do - Agent.get(@reg_name, & &1) - end - - def get_data_key(key, default \\ nil) do - get_in(state(), [:data, key]) || default - end - - def put_data_key(key, value) do - _ = persist_non_data_change(key, value) - - Agent.update(@reg_name, fn state -> - put_in(state, [:data, key], value) - end) - end - - def increment_data_key(key, increment \\ 1) do - Agent.update(@reg_name, fn state -> - initial_value = get_in(state, [:data, key]) || 0 - updated_value = initial_value + increment - put_in(state, [:data, key], updated_value) - end) - end - - defp persist_non_data_change(:state, value) do - with true <- get_data_key(:state) != value, - true <- value in Pleroma.DataMigration.State.__valid_values__(), - %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do - DataMigration.update_one_by_id(data_migration_id, state: value) - else - false -> :ok - _ -> {:error, :nil_data_migration_id} - end - end - - defp persist_non_data_change(_, _) do - nil - end - - def data_migration_id, do: Map.get(state(), :data_migration_id) -end diff --git a/lib/pleroma/migrators/support/base_migrator.ex b/lib/pleroma/migrators/support/base_migrator.ex new file mode 100644 index 000000000..1f8a5402b --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator.ex @@ -0,0 +1,210 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigrator do + @moduledoc """ + Base background migrator functionality. + """ + + @callback perform() :: any() + @callback retry_failed() :: any() + @callback feature_config_path() :: list(atom()) + @callback query() :: Ecto.Query.t() + @callback fault_rate_allowance() :: integer() | float() + + defmacro __using__(_opts) do + quote do + use GenServer + + require Logger + + import Ecto.Query + + alias __MODULE__.State + alias Pleroma.Config + alias Pleroma.Repo + + @behaviour Pleroma.Migrators.Support.BaseMigrator + + defdelegate data_migration(), to: State + defdelegate data_migration_id(), to: State + defdelegate state(), to: State + defdelegate persist_state(), to: State, as: :persist_to_db + defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key + defdelegate put_stat(key, value), to: State, as: :put_data_key + defdelegate increment_stat(key, increment), to: State, as: :increment_data_key + + @reg_name {:global, __MODULE__} + + def whereis, do: GenServer.whereis(@reg_name) + + def start_link(_) do + case whereis() do + nil -> + GenServer.start_link(__MODULE__, nil, name: @reg_name) + + pid -> + {:ok, pid} + end + end + + @impl true + def init(_) do + {:ok, nil, {:continue, :init_state}} + end + + @impl true + def handle_continue(:init_state, _state) do + {:ok, _} = State.start_link(nil) + + data_migration = data_migration() + manual_migrations = Config.get([:instance, :manual_data_migrations], []) + + cond do + Config.get(:env) == :test -> + update_status(:noop) + + is_nil(data_migration) -> + message = "Data migration does not exist." + update_status(:failed, message) + Logger.error("#{__MODULE__}: #{message}") + + data_migration.state == :manual or data_migration.name in manual_migrations -> + message = "Data migration is in manual execution or manual fix mode." + update_status(:manual, message) + Logger.warn("#{__MODULE__}: #{message}") + + data_migration.state == :complete -> + on_complete(data_migration) + + true -> + send(self(), :perform) + end + + {:noreply, nil} + end + + @impl true + def handle_info(:perform, state) do + State.reinit() + + update_status(:running) + put_stat(:iteration_processed_count, 0) + put_stat(:started_at, NaiveDateTime.utc_now()) + + perform() + + fault_rate = fault_rate() + put_stat(:fault_rate, fault_rate) + fault_rate_allowance = fault_rate_allowance() + + cond do + fault_rate == 0 -> + set_complete() + + is_float(fault_rate) and fault_rate <= fault_rate_allowance -> + message = """ + Done with fault rate of #{fault_rate} which doesn't exceed #{fault_rate_allowance}. + Putting data migration to manual fix mode. Try running `#{__MODULE__}.retry_failed/0`. + """ + + Logger.warn("#{__MODULE__}: #{message}") + update_status(:manual, message) + on_complete(data_migration()) + + true -> + message = "Too many failures. Try running `#{__MODULE__}.retry_failed/0`." + Logger.error("#{__MODULE__}: #{message}") + update_status(:failed, message) + end + + persist_state() + {:noreply, state} + end + + defp on_complete(data_migration) do + if data_migration.feature_lock || feature_state() == :disabled do + Logger.warn( + "#{__MODULE__}: migration complete but feature is locked; consider enabling." + ) + + :noop + else + Config.put(feature_config_path(), :enabled) + :ok + end + end + + @doc "Approximate count for current iteration (including processed records count)" + def count(force \\ false, timeout \\ :infinity) do + stored_count = get_stat(:count) + + if stored_count && !force do + stored_count + else + processed_count = get_stat(:processed_count, 0) + max_processed_id = get_stat(:max_processed_id, 0) + query = where(query(), [entity], entity.id > ^max_processed_id) + + count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count + put_stat(:count, count) + persist_state() + + count + end + end + + def failures_count do + with {:ok, %{rows: [[count]]}} <- + Repo.query( + "SELECT COUNT(record_id) FROM data_migration_failed_ids WHERE data_migration_id = $1;", + [data_migration_id()] + ) do + count + end + end + + def feature_state, do: Config.get(feature_config_path()) + + def force_continue do + send(whereis(), :perform) + end + + def force_restart do + :ok = State.reset() + force_continue() + end + + def set_complete do + update_status(:complete) + persist_state() + on_complete(data_migration()) + end + + defp update_status(status, message \\ nil) do + put_stat(:state, status) + put_stat(:message, message) + end + + defp fault_rate do + with failures_count when is_integer(failures_count) <- failures_count() do + failures_count / Enum.max([get_stat(:affected_count, 0), 1]) + else + _ -> :error + end + end + + defp records_per_second do + get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1]) + end + + defp running_time do + NaiveDateTime.diff( + NaiveDateTime.utc_now(), + get_stat(:started_at, NaiveDateTime.utc_now()) + ) + end + end + end +end diff --git a/lib/pleroma/migrators/support/base_migrator_state.ex b/lib/pleroma/migrators/support/base_migrator_state.ex new file mode 100644 index 000000000..69724ae79 --- /dev/null +++ b/lib/pleroma/migrators/support/base_migrator_state.ex @@ -0,0 +1,116 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2021 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Migrators.Support.BaseMigratorState do + @moduledoc """ + Base background migrator state functionality. + """ + + @callback data_migration() :: Pleroma.DataMigration.t() + + defmacro __using__(_opts) do + quote do + use Agent + + alias Pleroma.DataMigration + + @behaviour Pleroma.Migrators.Support.BaseMigratorState + @reg_name {:global, __MODULE__} + + def start_link(_) do + Agent.start_link(fn -> load_state_from_db() end, name: @reg_name) + end + + def data_migration, do: raise("data_migration/0 is not implemented") + defoverridable data_migration: 0 + + defp load_state_from_db do + data_migration = data_migration() + + data = + if data_migration do + Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end) + else + %{} + end + + %{ + data_migration_id: data_migration && data_migration.id, + data: data + } + end + + def persist_to_db do + %{data_migration_id: data_migration_id, data: data} = state() + + if data_migration_id do + DataMigration.update_one_by_id(data_migration_id, data: data) + else + {:error, :nil_data_migration_id} + end + end + + def reset do + %{data_migration_id: data_migration_id} = state() + + with false <- is_nil(data_migration_id), + :ok <- + DataMigration.update_one_by_id(data_migration_id, + state: :pending, + data: %{} + ) do + reinit() + else + true -> {:error, :nil_data_migration_id} + e -> e + end + end + + def reinit do + Agent.update(@reg_name, fn _state -> load_state_from_db() end) + end + + def state do + Agent.get(@reg_name, & &1) + end + + def get_data_key(key, default \\ nil) do + get_in(state(), [:data, key]) || default + end + + def put_data_key(key, value) do + _ = persist_non_data_change(key, value) + + Agent.update(@reg_name, fn state -> + put_in(state, [:data, key], value) + end) + end + + def increment_data_key(key, increment \\ 1) do + Agent.update(@reg_name, fn state -> + initial_value = get_in(state, [:data, key]) || 0 + updated_value = initial_value + increment + put_in(state, [:data, key], updated_value) + end) + end + + defp persist_non_data_change(:state, value) do + with true <- get_data_key(:state) != value, + true <- value in Pleroma.DataMigration.State.__valid_values__(), + %{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do + DataMigration.update_one_by_id(data_migration_id, state: value) + else + false -> :ok + _ -> {:error, :nil_data_migration_id} + end + end + + defp persist_non_data_change(_, _) do + nil + end + + def data_migration_id, do: Map.get(state(), :data_migration_id) + end + end +end