Merge branch 'feat/log-possible-hol' into 'develop'

ConnectionPool: Log possible HTTP1 blocks

See merge request pleroma/pleroma!2989
This commit is contained in:
rinpatch 2020-09-11 12:50:05 +00:00
commit 9fb88c814d
3 changed files with 38 additions and 14 deletions

View File

@ -50,10 +50,10 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
with open_opts <- Map.delete(opts, :tls_opts), with open_opts <- Map.delete(opts, :tls_opts),
{:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts), {:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]), {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]),
stream <- Gun.connect(conn, connect_opts), stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do {:response, :fin, 200, _} <- Gun.await(conn, stream) do
{:ok, conn} {:ok, conn, protocol}
else else
error -> error ->
Logger.warn( Logger.warn(
@ -88,8 +88,8 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
|> Map.put(:socks_opts, socks_opts) |> Map.put(:socks_opts, socks_opts)
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
{:ok, conn} {:ok, conn, protocol}
else else
error -> error ->
Logger.warn( Logger.warn(
@ -106,8 +106,8 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do
host = Pleroma.HTTP.AdapterHelper.parse_host(host) host = Pleroma.HTTP.AdapterHelper.parse_host(host)
with {:ok, conn} <- Gun.open(host, port, opts), with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do {:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
{:ok, conn} {:ok, conn, protocol}
else else
error -> error ->
Logger.warn( Logger.warn(

View File

@ -15,7 +15,7 @@ def init([_key, _uri, _opts, _client_pid] = opts) do
@impl true @impl true
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
with {:ok, conn_pid} <- Gun.Conn.open(uri, opts), with {:ok, conn_pid, protocol} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do Process.link(conn_pid) do
time = :erlang.monotonic_time(:millisecond) time = :erlang.monotonic_time(:millisecond)
@ -27,8 +27,12 @@ def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
send(client_pid, {:conn_pid, conn_pid}) send(client_pid, {:conn_pid, conn_pid})
{:noreply, {:noreply,
%{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}}, %{
:hibernate} key: key,
timer: nil,
client_monitors: %{client_pid => Process.monitor(client_pid)},
protocol: protocol
}, :hibernate}
else else
err -> err ->
{:stop, {:shutdown, err}, nil} {:stop, {:shutdown, err}, nil}
@ -53,14 +57,20 @@ def handle_cast({:remove_client, client_pid}, state) do
end end
@impl true @impl true
def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do def handle_call(:add_client, {client_pid, _}, %{key: key, protocol: protocol} = state) do
time = :erlang.monotonic_time(:millisecond) time = :erlang.monotonic_time(:millisecond)
{{conn_pid, _, _, _}, _} = {{conn_pid, used_by, _, _}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time} {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end) end)
:telemetry.execute(
[:pleroma, :connection_pool, :client, :add],
%{client_pid: client_pid, clients: used_by},
%{key: state.key, protocol: protocol}
)
state = state =
if state.timer != nil do if state.timer != nil do
Process.cancel_timer(state[:timer]) Process.cancel_timer(state[:timer])
@ -131,7 +141,7 @@ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) d
@impl true @impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
:telemetry.execute( :telemetry.execute(
[:pleroma, :connection_pool, :client_death], [:pleroma, :connection_pool, :client, :dead],
%{client_pid: pid, reason: reason}, %{client_pid: pid, reason: reason},
%{key: state.key} %{key: state.key}
) )

View File

@ -7,7 +7,8 @@ defmodule Pleroma.Telemetry.Logger do
[:pleroma, :connection_pool, :reclaim, :start], [:pleroma, :connection_pool, :reclaim, :start],
[:pleroma, :connection_pool, :reclaim, :stop], [:pleroma, :connection_pool, :reclaim, :stop],
[:pleroma, :connection_pool, :provision_failure], [:pleroma, :connection_pool, :provision_failure],
[:pleroma, :connection_pool, :client_death] [:pleroma, :connection_pool, :client, :dead],
[:pleroma, :connection_pool, :client, :add]
] ]
def attach do def attach do
:telemetry.attach_many("pleroma-logger", @events, &handle_event/4, []) :telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
@ -62,7 +63,7 @@ def handle_event(
end end
def handle_event( def handle_event(
[:pleroma, :connection_pool, :client_death], [:pleroma, :connection_pool, :client, :dead],
%{client_pid: client_pid, reason: reason}, %{client_pid: client_pid, reason: reason},
%{key: key}, %{key: key},
_ _
@ -73,4 +74,17 @@ def handle_event(
}" }"
end) end)
end end
def handle_event(
[:pleroma, :connection_pool, :client, :add],
%{clients: [_, _ | _] = clients},
%{key: key, protocol: :http},
_
) do
Logger.info(fn ->
"Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur."
end)
end
def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok
end end