mirror of
https://github.com/Retropex/bitfeed.git
synced 2025-05-13 03:30:47 +02:00
Clear RPC server bottleneck
Parallelizes the RPC GenServer
This commit is contained in:
parent
dc6286bd39
commit
0c48d0b233
@ -10,16 +10,16 @@ defmodule BitcoinStream.RPC do
|
||||
{port, opts} = Keyword.pop(opts, :port);
|
||||
{host, opts} = Keyword.pop(opts, :host);
|
||||
IO.puts("Starting Bitcoin RPC server on #{host} port #{port}")
|
||||
GenServer.start_link(__MODULE__, {host, port, nil, nil, []}, opts)
|
||||
GenServer.start_link(__MODULE__, {host, port, nil, nil, [], %{}}, opts)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({host, port, status, _, listeners}) do
|
||||
def init({host, port, status, _, listeners, inflight}) do
|
||||
# start node monitoring loop
|
||||
creds = rpc_creds();
|
||||
|
||||
send(self(), :check_status);
|
||||
{:ok, {host, port, status, creds, listeners}}
|
||||
{:ok, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
|
||||
defp notify_listeners([]) do
|
||||
@ -31,61 +31,89 @@ defmodule BitcoinStream.RPC do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:check_status, {host, port, _status, creds, listeners}) do
|
||||
# poll Bitcoin Core for current status
|
||||
status = check_status({host, port, creds});
|
||||
case status do
|
||||
def handle_info(:check_status, {host, port, status, creds, listeners, inflight}) do
|
||||
case async_request("getblockchaininfo", [], host, port, creds) do
|
||||
{:ok, task_ref} ->
|
||||
{:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, :status)}}
|
||||
|
||||
:error ->
|
||||
IO.puts("Waiting to connect to Bitcoin Core");
|
||||
Process.send_after(self(), :check_status, 10 * 1000);
|
||||
{:noreply, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, {host, port, status, creds, listeners, inflight}) do
|
||||
# IO.puts("DOWN: #{inspect pid} #{inspect reason}")
|
||||
{_, inflight} = Map.pop(inflight, ref);
|
||||
{:noreply, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({ref, result}, {host, port, status, creds, listeners, inflight}) do
|
||||
case Map.pop(inflight, ref) do
|
||||
{nil, inflight} ->
|
||||
{:noreply, {host, port, status, creds, listeners, inflight}}
|
||||
|
||||
{:status, inflight} ->
|
||||
case result do
|
||||
# if node is connected and finished with the initial block download
|
||||
{:ok, %{"initialblockdownload" => false}} ->
|
||||
{:ok, 200, %{"initialblockdownload" => false}} ->
|
||||
# notify all listening processes
|
||||
IO.puts("Bitcoin Core connected and synced");
|
||||
notify_listeners(listeners);
|
||||
Process.send_after(self(), :check_status, 300 * 1000);
|
||||
{:noreply, {host, port, status, creds, []}}
|
||||
{:noreply, {host, port, :ok, creds, [], inflight}}
|
||||
|
||||
{:ok, %{"initialblockdownload" => true}} ->
|
||||
{:ok, 200, %{"initialblockdownload" => true}} ->
|
||||
IO.puts("Bitcoin Core connected, waiting for initial block download");
|
||||
Process.send_after(self(), :check_status, 30 * 1000);
|
||||
{:noreply, {host, port, status, creds, listeners}}
|
||||
{:noreply, {host, port, :ibd, creds, listeners, inflight}}
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting to connect to Bitcoin Core");
|
||||
Process.send_after(self(), :check_status, 10 * 1000);
|
||||
{:noreply, {host, port, status, creds, listeners}}
|
||||
{:noreply, {host, port, :disconnected, creds, listeners, inflight}}
|
||||
end
|
||||
|
||||
{from, inflight} ->
|
||||
GenServer.reply(from, result)
|
||||
{:noreply, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:request, method, params}, _from, {host, port, status, creds, listeners}) do
|
||||
case make_request(host, port, creds, method, params) do
|
||||
{:ok, code, info} ->
|
||||
{:reply, {:ok, code, info}, {host, port, status, creds, listeners}}
|
||||
def handle_call({:request, method, params}, from, {host, port, status, creds, listeners, inflight}) do
|
||||
case async_request(method, params, host, port, creds) do
|
||||
{:ok, task_ref} ->
|
||||
{:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, from)}}
|
||||
|
||||
{:error, reason} ->
|
||||
{:reply, {:error, reason}, {host, port, status, creds, listeners}}
|
||||
|
||||
error ->
|
||||
{:reply, error, {host, port, status, creds, listeners}}
|
||||
:error ->
|
||||
{:reply, 500, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_node_status, _from, {host, port, status, creds, listeners}) do
|
||||
{:reply, status, {host, port, status, creds, listeners}}
|
||||
def handle_call(:get_node_status, _from, {host, port, status, creds, listeners, inflight}) do
|
||||
{:reply, status, {host, port, status, creds, listeners, inflight}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners}) do
|
||||
{:noreply, {host, port, status, creds, [from | listeners]}}
|
||||
def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners, inflight}) do
|
||||
{:noreply, {host, port, status, creds, [from | listeners], inflight}}
|
||||
end
|
||||
|
||||
def notify_on_ready(pid) do
|
||||
GenServer.call(pid, :notify_on_ready, :infinity)
|
||||
end
|
||||
|
||||
defp make_request(host, port, creds, method, params) do
|
||||
defp async_request(method, params, host, port, creds) do
|
||||
with { user, pw } <- creds,
|
||||
{:ok, rpc_request} <- Jason.encode(%{method: method, params: params}),
|
||||
{:ok, %Finch.Response{body: body, headers: _headers, status: status}} <- Finch.build(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}, {"authorization", BasicAuth.encode_basic_auth(user, pw)}], rpc_request) |> Finch.request(FinchClient),
|
||||
{:ok, rpc_request} <- Jason.encode(%{method: method, params: params}) do
|
||||
task = Task.async(
|
||||
fn ->
|
||||
with {:ok, %Finch.Response{body: body, headers: _headers, status: status}} <- Finch.build(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}, {"authorization", BasicAuth.encode_basic_auth(user, pw)}], rpc_request) |> Finch.request(FinchClient),
|
||||
{:ok, %{"result" => info}} <- Jason.decode(body) do
|
||||
{:ok, status, info}
|
||||
else
|
||||
@ -102,6 +130,15 @@ defmodule BitcoinStream.RPC do
|
||||
{:error, err}
|
||||
end
|
||||
end
|
||||
)
|
||||
{:ok, task.ref}
|
||||
else
|
||||
err ->
|
||||
IO.puts("failed to make RPC request");
|
||||
IO.inspect(err);
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
def request(pid, method, params) do
|
||||
GenServer.call(pid, {:request, method, params}, 30000)
|
||||
@ -120,16 +157,6 @@ defmodule BitcoinStream.RPC do
|
||||
GenServer.call(pid, :get_node_status, 10000)
|
||||
end
|
||||
|
||||
defp check_status({host, port, creds}) do
|
||||
case make_request(host, port, creds, "getblockchaininfo", []) do
|
||||
{:ok, 200, info} -> {:ok, info}
|
||||
|
||||
{:ok, code, info} -> {:error, code, info}
|
||||
|
||||
_ -> {:error}
|
||||
end
|
||||
end
|
||||
|
||||
defp rpc_creds() do
|
||||
cookie_path = System.get_env("BITCOIN_RPC_COOKIE");
|
||||
rpc_user = System.get_env("BITCOIN_RPC_USER");
|
||||
|
@ -57,7 +57,7 @@ defmodule BitcoinStream.Bridge.Block do
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
:ok -> true
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to block socket");
|
||||
|
@ -55,7 +55,7 @@ defmodule BitcoinStream.Bridge.Sequence do
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
:ok -> true
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to sequence socket");
|
||||
|
@ -56,7 +56,7 @@ defmodule BitcoinStream.Bridge.Tx do
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
:ok -> true
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to tx socket");
|
||||
|
@ -191,7 +191,8 @@ defmodule BitcoinStream.Mempool do
|
||||
:ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee }, nil});
|
||||
get(pid)
|
||||
else
|
||||
_ -> false
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
|
||||
# new transaction, not yet registered
|
||||
|
@ -36,9 +36,16 @@ defmodule BitcoinStream.Mempool.Sync do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(_event, state) do
|
||||
# if RPC responds after the calling process already timed out, garbled messages get dumped to handle_info
|
||||
# quietly discard
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
:ok -> true
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting for node to come online and fully sync before synchronizing mempool");
|
||||
|
@ -81,6 +81,7 @@ defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do
|
||||
if do_inflate do
|
||||
inflated_txn = BitcoinTx.inflate(extended_txn)
|
||||
if (inflated_txn.inflated) do
|
||||
IO.puts("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{next.id}");
|
||||
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true)
|
||||
else
|
||||
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false)
|
||||
|
@ -185,7 +185,6 @@ defmodule BitcoinStream.Protocol.Transaction do
|
||||
{:ok, inputs, total}
|
||||
|
||||
other ->
|
||||
IO.puts("unexpected cached value: ")
|
||||
IO.inspect(other);
|
||||
inflate_inputs(inputs, [], 0)
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user