Merge branch 'search-healthcheck' into 'develop'

Search backend healthcheck process

See merge request pleroma/pleroma!4120
This commit is contained in:
lain 2024-05-27 09:46:57 +00:00
commit 5e43060128
9 changed files with 167 additions and 4 deletions

View File

@ -0,0 +1 @@
Monitoring of search backend health to control the processing of jobs in the search indexing Oban queue

View File

@ -579,7 +579,7 @@
attachments_cleanup: 1, attachments_cleanup: 1,
new_users_digest: 1, new_users_digest: 1,
mute_expire: 5, mute_expire: 5,
search_indexing: 10, search_indexing: [limit: 10, paused: true],
rich_media_expiration: 2 rich_media_expiration: 2
], ],
plugins: [Oban.Plugins.Pruner], plugins: [Oban.Plugins.Pruner],

View File

@ -109,7 +109,8 @@ def start(_type, _args) do
streamer_registry() ++ streamer_registry() ++
background_migrators() ++ background_migrators() ++
shout_child(shout_enabled?()) ++ shout_child(shout_enabled?()) ++
[Pleroma.Gopher.Server] [Pleroma.Gopher.Server] ++
[Pleroma.Search.Healthcheck]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options # for other strategies and supported options

View File

@ -10,8 +10,12 @@ def remove_from_index(%Pleroma.Object{id: object_id}) do
end end
def search(query, options) do def search(query, options) do
search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity) search_module = Pleroma.Config.get([Pleroma.Search, :module])
search_module.search(options[:for_user], query, options) search_module.search(options[:for_user], query, options)
end end
def healthcheck_endpoints do
search_module = Pleroma.Config.get([Pleroma.Search, :module])
search_module.healthcheck_endpoints
end
end end

View File

@ -48,6 +48,9 @@ def add_to_index(_activity), do: :ok
@impl true @impl true
def remove_from_index(_object), do: :ok def remove_from_index(_object), do: :ok
@impl true
def healthcheck_endpoints, do: nil
def maybe_restrict_author(query, %User{} = author) do def maybe_restrict_author(query, %User{} = author) do
Activity.Queries.by_author(query, author) Activity.Queries.by_author(query, author)
end end

View File

@ -0,0 +1,86 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2024 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Healthcheck do
@doc """
Monitors health of search backend to control processing of events based on health and availability.
"""
use GenServer
require Logger
@queue :search_indexing
@tick :timer.seconds(5)
@timeout :timer.seconds(2)
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_) do
state = %{healthy: false}
{:ok, state, {:continue, :start}}
end
@impl true
def handle_continue(:start, state) do
tick()
{:noreply, state}
end
@impl true
def handle_info(:check, state) do
urls = Pleroma.Search.healthcheck_endpoints()
new_state =
if check(urls) do
Oban.resume_queue(queue: @queue)
Map.put(state, :healthy, true)
else
Oban.pause_queue(queue: @queue)
Map.put(state, :healthy, false)
end
maybe_log_state_change(state, new_state)
tick()
{:noreply, new_state}
end
@impl true
def handle_call(:state, _from, state) do
{:reply, state, state, :hibernate}
end
def state, do: GenServer.call(__MODULE__, :state)
def check([]), do: true
def check(urls) when is_list(urls) do
Enum.all?(
urls,
fn url ->
case Pleroma.HTTP.get(url, [], recv_timeout: @timeout) do
{:ok, %{status: 200}} -> true
_ -> false
end
end
)
end
def check(_), do: true
defp tick do
Process.send_after(self(), :check, @tick)
end
defp maybe_log_state_change(%{healthy: true}, %{healthy: false}) do
Logger.error("Pausing Oban queue #{@queue} due to search backend healthcheck failure")
end
defp maybe_log_state_change(%{healthy: false}, %{healthy: true}) do
Logger.info("Resuming Oban queue #{@queue} due to search backend healthcheck pass")
end
defp maybe_log_state_change(_, _), do: :ok
end

View File

@ -178,4 +178,15 @@ def add_to_index(activity) do
def remove_from_index(object) do def remove_from_index(object) do
meili_delete("/indexes/objects/documents/#{object.id}") meili_delete("/indexes/objects/documents/#{object.id}")
end end
@impl true
def healthcheck_endpoints do
endpoint =
Config.get([Pleroma.Search.Meilisearch, :url])
|> URI.parse()
|> Map.put(:path, "/health")
|> URI.to_string()
[endpoint]
end
end end

View File

@ -21,4 +21,12 @@ defmodule Pleroma.Search.SearchBackend do
from index. from index.
""" """
@callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()} @callback remove_from_index(object :: Pleroma.Object.t()) :: :ok | {:error, any()}
@doc """
Healthcheck endpoints of search backend infrastructure to monitor for controlling
processing of jobs in the Oban queue.
It is expected a 200 response is healthy and other responses are unhealthy.
"""
@callback healthcheck_endpoints :: list() | nil
end end

View File

@ -0,0 +1,49 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2024 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.HealthcheckTest do
use Pleroma.DataCase
import Tesla.Mock
alias Pleroma.Search.Healthcheck
@good1 "http://good1.example.com/healthz"
@good2 "http://good2.example.com/health"
@bad "http://bad.example.com/healthy"
setup do
mock(fn
%{method: :get, url: @good1} ->
%Tesla.Env{
status: 200,
body: ""
}
%{method: :get, url: @good2} ->
%Tesla.Env{
status: 200,
body: ""
}
%{method: :get, url: @bad} ->
%Tesla.Env{
status: 503,
body: ""
}
end)
:ok
end
test "true for 200 responses" do
assert Healthcheck.check([@good1])
assert Healthcheck.check([@good1, @good2])
end
test "false if any response is not a 200" do
refute Healthcheck.check([@bad])
refute Healthcheck.check([@good1, @bad])
end
end