Implement suggestions from the Meilisearch MR

- Index unlisted posts
- Move version check outside of the streaming and only do it once
- Use a PUT request instead of checking manually if there is need to insert
- Add error handling, sort of
This commit is contained in:
Ekaterina Vaartis 2021-11-22 21:39:54 +03:00
parent a6946048fb
commit a12f63bc81
2 changed files with 93 additions and 76 deletions

View File

@ -3,38 +3,40 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
require Logger
require Pleroma.Constants require Pleroma.Constants
import Mix.Pleroma import Mix.Pleroma
import Ecto.Query import Ecto.Query
import Pleroma.Search.Meilisearch, only: [meili_post!: 2, meili_delete!: 1, meili_get!: 1] import Pleroma.Search.Meilisearch,
only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
def run(["index" | args]) do def run(["index"]) do
start_pleroma() start_pleroma()
is_reindex = "--reindex" in args {:ok, _} =
meili_post(
"/indexes/objects/settings/ranking-rules",
[
"desc(published)",
"words",
"exactness",
"proximity",
"wordsPosition",
"typo",
"attribute"
]
)
meili_post!( {:ok, _} =
"/indexes/objects/settings/ranking-rules", meili_post(
[ "/indexes/objects/settings/searchable-attributes",
"desc(published)", [
"words", "content"
"exactness", ]
"proximity", )
"wordsPosition",
"typo",
"attribute"
]
)
meili_post!( IO.puts("Created indices. Starting to insert posts.")
"/indexes/objects/settings/searchable-attributes",
[
"content"
]
)
chunk_size = 10_000 chunk_size = 10_000
@ -42,11 +44,11 @@ def run(["index" | args]) do
fn -> fn ->
query = query =
from(Pleroma.Object, from(Pleroma.Object,
# Only index public posts which are notes and have some text # Only index public and unlisted posts which are notes and have some text
where: where:
fragment("data->>'type' = 'Note'") and fragment("data->>'type' = 'Note'") and
fragment("LENGTH(data->>'content') > 0") and (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()), fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
order_by: [desc: fragment("data->'published'")] order_by: [desc: fragment("data->'published'")]
) )
@ -70,34 +72,18 @@ def run(["index" | args]) do
{[objects], new_acc} {[objects], new_acc}
end) end)
|> Stream.each(fn objects -> |> Stream.each(fn objects ->
objects =
objects
|> Enum.filter(fn o ->
if is_reindex do
result = meili_get!("/indexes/objects/documents/#{o.id}")
# With >= 0.24.0 the name for "errorCode" is just "code"
error_code_key =
if meili_get!("/version")["pkgVersion"] |> Version.match?(">= 0.24.0"),
do: "code",
else: "errorCode"
# Filter out the already indexed documents.
# This is true when the document does not exist
result[error_code_key] == "document_not_found"
else
true
end
end)
result = result =
meili_post!( meili_put(
"/indexes/objects/documents", "/indexes/objects/documents",
objects objects
) )
if not Map.has_key?(result, "updateId") do with {:ok, res} <- result do
IO.puts("Failed to index: #{inspect(result)}") if not Map.has_key?(res, "updateId") do
IO.puts("\nFailed to index: #{inspect(result)}")
end
else
e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
end end
end) end)
|> Stream.run() |> Stream.run()
@ -137,7 +123,7 @@ def run(["show-private-key", master_key]) do
def run(["stats"]) do def run(["stats"]) do
start_pleroma() start_pleroma()
result = meili_get!("/indexes/objects/stats") {:ok, result} = meili_get("/indexes/objects/stats")
IO.puts("Number of entries: #{result["numberOfDocuments"]}") IO.puts("Number of entries: #{result["numberOfDocuments"]}")
IO.puts("Indexing? #{result["isIndexing"]}") IO.puts("Indexing? #{result["isIndexing"]}")
end end

View File

@ -14,29 +14,50 @@ defp meili_headers do
if is_nil(private_key), do: [], else: [{"X-Meili-API-Key", private_key}] if is_nil(private_key), do: [], else: [{"X-Meili-API-Key", private_key}]
end end
def meili_get!(path) do def meili_get(path) do
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} = result =
Pleroma.HTTP.get( Pleroma.HTTP.get(
Path.join(endpoint, path), Path.join(endpoint, path),
meili_headers() meili_headers()
) )
Jason.decode!(result.body) with {:ok, res} <- result do
{:ok, Jason.decode!(res.body)}
end
end end
def meili_post!(path, params) do def meili_post(path, params) do
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} = result =
Pleroma.HTTP.post( Pleroma.HTTP.post(
Path.join(endpoint, path), Path.join(endpoint, path),
Jason.encode!(params), Jason.encode!(params),
meili_headers() meili_headers()
) )
Jason.decode!(result.body) with {:ok, res} <- result do
{:ok, Jason.decode!(res.body)}
end
end
def meili_put(path, params) do
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
result =
Pleroma.HTTP.request(
:put,
Path.join(endpoint, path),
Jason.encode!(params),
meili_headers(),
[]
)
with {:ok, res} <- result do
{:ok, Jason.decode!(res.body)}
end
end end
def meili_delete!(path) do def meili_delete!(path) do
@ -57,34 +78,40 @@ def search(user, query, options \\ []) do
offset = Keyword.get(options, :offset, 0) offset = Keyword.get(options, :offset, 0)
author = Keyword.get(options, :author) author = Keyword.get(options, :author)
result = res =
meili_post!( meili_post(
"/indexes/objects/search", "/indexes/objects/search",
%{q: query, offset: offset, limit: limit} %{q: query, offset: offset, limit: limit}
) )
hits = result["hits"] |> Enum.map(& &1["ap"]) with {:ok, result} <- res do
hits = result["hits"] |> Enum.map(& &1["ap"])
try do try do
hits hits
|> Activity.create_by_object_ap_id() |> Activity.create_by_object_ap_id()
|> Activity.with_preloaded_object() |> Activity.with_preloaded_object()
|> Activity.with_preloaded_object() |> Activity.with_preloaded_object()
|> Activity.restrict_deactivated_users() |> Activity.restrict_deactivated_users()
|> maybe_restrict_local(user) |> maybe_restrict_local(user)
|> maybe_restrict_author(author) |> maybe_restrict_author(author)
|> maybe_restrict_blocked(user) |> maybe_restrict_blocked(user)
|> maybe_fetch(user, query) |> maybe_fetch(user, query)
|> order_by([object: obj], desc: obj.data["published"]) |> order_by([object: obj], desc: obj.data["published"])
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
rescue rescue
_ -> maybe_fetch([], user, query) _ -> maybe_fetch([], user, query)
end
end end
end end
def object_to_search_data(object) do def object_to_search_data(object) do
# Only index public or unlisted Notes
if not is_nil(object) and object.data["type"] == "Note" and if not is_nil(object) and object.data["type"] == "Note" and
Pleroma.Constants.as_public() in object.data["to"] do not is_nil(object.data["content"]) and
(Pleroma.Constants.as_public() in object.data["to"] or
Pleroma.Constants.as_public() in object.data["cc"]) and
String.length(object.data["content"]) > 1 do
data = object.data data = object.data
content_str = content_str =
@ -117,13 +144,17 @@ def add_to_index(activity) do
if activity.data["type"] == "Create" and maybe_search_data do if activity.data["type"] == "Create" and maybe_search_data do
result = result =
meili_post!( meili_put(
"/indexes/objects/documents", "/indexes/objects/documents",
[maybe_search_data] [maybe_search_data]
) )
if not Map.has_key?(result, "updateId") do with {:ok, res} <- result,
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}") true <- Map.has_key?(res, "updateId") do
# Do nothing
else
_ ->
Logger.error("Failed to add activity #{activity.id} to index: #{inspect(result)}")
end end
end end
end end