[#3213] Added `HashtagsTableMigrator.count/1`.
This commit is contained in:
parent
9d28a7ebfb
commit
7f07909a7b
|
@ -85,19 +85,8 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
|
|
||||||
max_processed_id = data_migration.data["max_processed_id"] || 0
|
max_processed_id = data_migration.data["max_processed_id"] || 0
|
||||||
|
|
||||||
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
|
query()
|
||||||
from(
|
|> where([object], object.id > ^max_processed_id)
|
||||||
object in Object,
|
|
||||||
left_join: hashtag in assoc(object, :hashtags),
|
|
||||||
where: object.id > ^max_processed_id,
|
|
||||||
where: is_nil(hashtag.id),
|
|
||||||
where:
|
|
||||||
fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
|
|
||||||
select: %{
|
|
||||||
id: object.id,
|
|
||||||
tag: fragment("(?)->'tag'", object.data)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|> Repo.chunk_stream(100, :batches, timeout: :infinity)
|
|> Repo.chunk_stream(100, :batches, timeout: :infinity)
|
||||||
|> Stream.each(fn objects ->
|
|> Stream.each(fn objects ->
|
||||||
object_ids = Enum.map(objects, & &1.id)
|
object_ids = Enum.map(objects, & &1.id)
|
||||||
|
@ -155,6 +144,21 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp query do
|
||||||
|
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
|
||||||
|
from(
|
||||||
|
object in Object,
|
||||||
|
left_join: hashtag in assoc(object, :hashtags),
|
||||||
|
where: is_nil(hashtag.id),
|
||||||
|
where:
|
||||||
|
fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data),
|
||||||
|
select: %{
|
||||||
|
id: object.id,
|
||||||
|
tag: fragment("(?)->'tag'", object.data)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
defp transfer_object_hashtags(object) do
|
defp transfer_object_hashtags(object) do
|
||||||
hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
|
hashtags = Object.object_data_hashtags(%{"tag" => object.tag})
|
||||||
|
|
||||||
|
@ -188,6 +192,18 @@ defp transfer_object_hashtags(object) do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def count(force \\ false) do
|
||||||
|
stored_count = state()[:count]
|
||||||
|
|
||||||
|
if stored_count && !force do
|
||||||
|
stored_count
|
||||||
|
else
|
||||||
|
count = Repo.aggregate(query(), :count, :id)
|
||||||
|
put_stat(:count, count)
|
||||||
|
count
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp persist_stats(data_migration) do
|
defp persist_stats(data_migration) do
|
||||||
runner_state = Map.drop(state(), [:status])
|
runner_state = Map.drop(state(), [:status])
|
||||||
_ = DataMigration.update(data_migration, %{data: runner_state})
|
_ = DataMigration.update(data_migration, %{data: runner_state})
|
||||||
|
|
Loading…
Reference in New Issue