[#3213] Fixed `HashtagsTableMigrator.count/1`.

This commit is contained in:
Ivan Tashkinov 2021-02-17 09:23:35 +03:00
parent 111bfdd3a0
commit 854ea1aefb
1 changed files with 9 additions and 6 deletions

View File

@ -18,7 +18,8 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
defdelegate data_migration(), to: State defdelegate data_migration(), to: State
defdelegate state(), to: State defdelegate state(), to: State
defdelegate get_stat(key, value), to: State, as: :get_data_key defdelegate persist_state(), to: State, as: :persist_to_db
defdelegate get_stat(key, value \\ nil), to: State, as: :get_data_key
defdelegate put_stat(key, value), to: State, as: :put_data_key defdelegate put_stat(key, value), to: State, as: :put_data_key
defdelegate increment_stat(key, increment), to: State, as: :increment_data_key defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
@ -116,7 +117,7 @@ def handle_info(:migrate_hashtags, state) do
increment_stat(:processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids)) increment_stat(:failed_count, length(failed_ids))
put_stat(:records_per_second, records_per_second()) put_stat(:records_per_second, records_per_second())
_ = State.persist_to_db() persist_state()
# A quick and dirty approach to controlling the load this background migration imposes # A quick and dirty approach to controlling the load this background migration imposes
sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
@ -237,17 +238,19 @@ defp transfer_object_hashtags(object, hashtags) do
@doc "Approximate count for current iteration (including processed records count)" @doc "Approximate count for current iteration (including processed records count)"
def count(force \\ false, timeout \\ :infinity) do def count(force \\ false, timeout \\ :infinity) do
stored_count = state()[:count] stored_count = get_stat(:count)
if stored_count && !force do if stored_count && !force do
stored_count stored_count
else else
processed_count = state()[:processed_count] || 0 processed_count = get_stat(:processed_count, 0)
max_processed_id = data_migration().data["max_processed_id"] || 0 max_processed_id = get_stat(:max_processed_id, 0)
query = where(query(), [object], object.id > ^max_processed_id) query = where(query(), [object], object.id > ^max_processed_id)
count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
put_stat(:count, count) put_stat(:count, count)
persist_state()
count count
end end
end end
@ -316,7 +319,7 @@ def force_restart do
def set_complete do def set_complete do
update_status(:complete) update_status(:complete)
_ = State.persist_to_db() persist_state()
on_complete(data_migration()) on_complete(data_migration())
end end