Dedicated RPC process, wait for ibd on startup

This commit is contained in:
Mononaut 2022-01-23 16:55:10 -06:00
parent a343ec1bc6
commit be85d5d853
4 changed files with 190 additions and 114 deletions

123
server/lib/bitcoin_rpc.ex Normal file
View File

@ -0,0 +1,123 @@
Application.ensure_all_started(:hackney)
defmodule BitcoinStream.RPC do
@moduledoc """
GenServer for bitcoin rpc requests
"""
use GenServer
def start_link(opts) do
{port, opts} = Keyword.pop(opts, :port);
{host, opts} = Keyword.pop(opts, :host);
IO.puts("Starting Bitcoin RPC server on #{host} port #{port}")
GenServer.start_link(__MODULE__, {host, port, nil}, opts)
end
@impl true
def init(state) do
# start node monitoring loop
send(self(), :check_status)
{:ok, state}
end
def handle_info(:check_status, state) do
# Do the desired work here
state = check_status(state)
Process.send_after(self(), :check_status, 60 * 1000)
{:noreply, state}
end
@impl true
def handle_call({:request, method, params}, _from, {host, port, status}) do
case make_request(host, port, method, params) do
{:ok, info} ->
{:reply, {:ok, info}, {host, port, status}}
{:error, reason} ->
{:reply, {:error, reason}, {host, port, status}}
end
end
@impl true
def handle_call({:get_node_status}, _from, {host, port, status}) do
{:reply, {:ok, status}, {host, port, status}}
end
defp make_request(host, port, method, params) do
with { user, pw } <- rpc_creds(),
{:ok, rpc_request} <- Jason.encode(%{method: method, params: params}),
{:ok, 200, _headers, body_ref} <- :hackney.request(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}], rpc_request, [basic_auth: { user, pw }]),
{:ok, body} <- :hackney.body(body_ref),
{:ok, %{"result" => info}} <- Jason.decode(body) do
{:ok, info}
else
{:ok, code, _} ->
IO.puts("RPC request #{method} failed with HTTP code #{code}")
{:error, code}
{:error, reason} ->
IO.puts("RPC request #{method} failed");
IO.inspect(reason)
{:error, reason}
err ->
IO.puts("RPC request #{method} failed: (unknown reason)");
IO.inspect(err);
{:error, err}
end
end
def request(pid, method, params) do
IO.inspect({pid, method, params});
GenServer.call(pid, {:request, method, params}, 60000)
catch
:exit, reason ->
IO.puts("RPC request #{method} failed - probably timed out?")
IO.inspect(reason)
end
def get_node_status(pid) do
GenServer.call(pid, {:get_node_status})
end
def check_status({host, port, status}) do
with {:ok, info} <- make_request(host, port, "getblockchaininfo", []) do
{host, port, info}
else
{:error, reason} ->
IO.puts("node status check failed");
IO.inspect(reason)
{host, port, status}
err ->
IO.puts("node status check failed: (unknown reason)");
IO.inspect(err);
{host, port, status}
end
end
defp rpc_creds() do
cookie_path = System.get_env("BITCOIN_RPC_COOKIE");
rpc_user = System.get_env("BITCOIN_RPC_USER");
rpc_pw = System.get_env("BITCOIN_RPC_PASS");
cond do
(rpc_user != nil && rpc_pw != nil)
-> { rpc_user, rpc_pw }
(cookie_path != nil)
->
with {:ok, cookie} <- File.read(cookie_path),
[ user, pw ] <- String.split(cookie, ":") do
{ user, pw }
else
{:error, reason} ->
IO.puts("Failed to load bitcoin rpc cookie");
IO.inspect(reason)
:error
err ->
IO.puts("Failed to load bitcoin rpc cookie: (unknown reason)");
IO.inspect(err);
:error
end
true ->
IO.puts("Missing bitcoin rpc credentials");
:error
end
end
end

View File

@ -10,6 +10,7 @@ defmodule BitcoinStream.Bridge do
alias BitcoinStream.Protocol.Block, as: BitcoinBlock alias BitcoinStream.Protocol.Block, as: BitcoinBlock
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
alias BitcoinStream.Mempool, as: Mempool alias BitcoinStream.Mempool, as: Mempool
alias BitcoinStream.RPC, as: RPC
def child_spec(host: host, tx_port: tx_port, block_port: block_port) do def child_spec(host: host, tx_port: tx_port, block_port: block_port) do
%{ %{
@ -20,10 +21,8 @@ defmodule BitcoinStream.Bridge do
def start_link(host, tx_port, block_port) do def start_link(host, tx_port, block_port) do
IO.puts("Starting Bitcoin bridge on #{host} ports #{tx_port}, #{block_port}") IO.puts("Starting Bitcoin bridge on #{host} ports #{tx_port}, #{block_port}")
connect_to_server(host, tx_port); Task.start(fn -> connect_tx(host, tx_port) end);
connect_to_server(host, block_port); Task.start(fn -> connect_block(host, block_port) end);
txsub(host, tx_port);
blocksub(host, block_port);
GenServer.start_link(__MODULE__, %{}) GenServer.start_link(__MODULE__, %{})
end end
@ -31,28 +30,61 @@ defmodule BitcoinStream.Bridge do
{:ok, arg} {:ok, arg}
end end
@doc """ defp connect_tx(host, port) do
Create zmq client # check rpc online & synced
""" IO.puts("Waiting for node to come online and fully sync before connecting to tx socket");
def start_client(host, port) do wait_for_ibd();
IO.puts("Starting client on #{host} port #{port}"); IO.puts("Node is fully synced, connecting to tx socket");
{:ok, socket} = :chumak.socket(:pair);
IO.puts("Client socket paired"); # connect to socket
{:ok, pid} = :chumak.connect(socket, :tcp, String.to_charlist(host), port); {:ok, socket} = :chumak.socket(:sub);
IO.puts("Client socket connected"); IO.puts("Connected tx zmq socket on #{host} port #{port}");
{socket, pid} :chumak.subscribe(socket, 'rawtx')
IO.puts("Subscribed to rawtx events")
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to tx socket pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding tx socket failed: #{reason}");
_ -> IO.puts("???");
end
# start tx loop
tx_loop(socket)
end end
@doc """ defp connect_block(host, port) do
Send a message from the client # check rpc online & synced
""" IO.puts("Waiting for node to come online and fully sync before connecting to block socket");
def client_send(socket, message) do wait_for_ibd();
:ok = :chumak.send(socket, message); IO.puts("Node is fully synced, connecting to block socket");
{:ok, response} = :chumak.recv(socket);
response # sync mempool
Mempool.sync(:mempool);
# connect to socket
{:ok, socket} = :chumak.socket(:sub);
IO.puts("Connected block zmq socket on #{host} port #{port}");
:chumak.subscribe(socket, 'rawblock')
IO.puts("Subscribed to rawblock events")
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to block socket pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding block socket failed: #{reason}");
_ -> IO.puts("???");
end
# start block loop
block_loop(socket)
end end
def sendTxn(txn) do defp wait_for_ibd() do
case RPC.get_node_status(:rpc) do
{:ok, %{"initialblockdownload" => false}} -> true
_ ->
Process.sleep(5000);
wait_for_ibd()
end
end
defp sendTxn(txn) do
# IO.puts("Forwarding transaction to websocket clients") # IO.puts("Forwarding transaction to websocket clients")
case Jason.encode(%{type: "txn", txn: txn}) do case Jason.encode(%{type: "txn", txn: txn}) do
{:ok, payload} -> {:ok, payload} ->
@ -65,11 +97,11 @@ defmodule BitcoinStream.Bridge do
end end
end end
def incrementMempool() do defp incrementMempool() do
Mempool.increment(:mempool) Mempool.increment(:mempool)
end end
def sendBlock(block) do defp sendBlock(block) do
case Jason.encode(%{type: "block", block: block}) do case Jason.encode(%{type: "block", block: block}) do
{:ok, payload} -> {:ok, payload} ->
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) -> Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
@ -82,7 +114,7 @@ defmodule BitcoinStream.Bridge do
end end
end end
defp client_tx_loop(socket) do defp tx_loop(socket) do
# IO.puts("client tx loop"); # IO.puts("client tx loop");
with {:ok, message} <- :chumak.recv_multipart(socket), with {:ok, message} <- :chumak.recv_multipart(socket),
[_topic, payload, _size] <- message, [_topic, payload, _size] <- message,
@ -94,10 +126,10 @@ defmodule BitcoinStream.Bridge do
_ -> IO.puts("Bitcoin node transaction feed bridge error (unknown reason)"); _ -> IO.puts("Bitcoin node transaction feed bridge error (unknown reason)");
end end
client_tx_loop(socket) tx_loop(socket)
end end
defp client_block_loop(socket) do defp block_loop(socket) do
IO.puts("client block loop"); IO.puts("client block loop");
with {:ok, message} <- :chumak.recv_multipart(socket), with {:ok, message} <- :chumak.recv_multipart(socket),
[_topic, payload, _size] <- message, [_topic, payload, _size] <- message,
@ -112,42 +144,7 @@ defmodule BitcoinStream.Bridge do
_ -> IO.puts("Bitcoin node block feed bridge error (unknown reason)"); _ -> IO.puts("Bitcoin node block feed bridge error (unknown reason)");
end end
client_block_loop(socket) block_loop(socket)
end end
@doc """
Set up demo zmq client
"""
def connect_to_server(host, port) do
IO.puts("Starting on #{host}:#{port}");
{client_socket, _client_pid} = start_client(host, port);
IO.puts("Started client");
client_socket
end
def txsub(host, port) do
IO.puts("Subscribing to rawtx events")
{:ok, socket} = :chumak.socket(:sub)
:chumak.subscribe(socket, 'rawtx')
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding failed: #{reason}");
_ -> IO.puts("unhandled response");
end
Task.start(fn -> client_tx_loop(socket) end);
end
def blocksub(host, port) do
IO.puts("Subscribing to rawblock events")
{:ok, socket} = :chumak.socket(:sub)
:chumak.subscribe(socket, 'rawblock')
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding failed: #{reason}");
_ -> IO.puts("unhandled response");
end
Task.start(fn -> client_block_loop(socket) end);
end
end end

View File

@ -1,20 +1,18 @@
Application.ensure_all_started(:hackney)
defmodule BitcoinStream.Mempool do defmodule BitcoinStream.Mempool do
@moduledoc """ @moduledoc """
Agent for retrieving and maintaining mempool info (primarily tx count) Agent for retrieving and maintaining mempool info (primarily tx count)
""" """
use Agent use Agent
alias BitcoinStream.RPC, as: RPC
@doc """ @doc """
Start a new mempool tracker, Start a new mempool tracker,
connecting to a bitcoin node at RPC `host:port` for ground truth data connecting to a bitcoin node at RPC `host:port` for ground truth data
""" """
def start_link(opts) do def start_link(opts) do
{port, opts} = Keyword.pop(opts, :port); IO.puts("Starting mempool agent");
{host, opts} = Keyword.pop(opts, :host); case Agent.start_link(fn -> %{count: 0} end, opts) do
IO.puts("Starting mempool agent on #{host} port #{port}");
case Agent.start_link(fn -> %{count: 0, host: host, port: port} end, opts) do
{:ok, pid} -> {:ok, pid} ->
sync(pid); sync(pid);
{:ok, pid} {:ok, pid}
@ -23,14 +21,6 @@ defmodule BitcoinStream.Mempool do
end end
end end
def getHost(pid) do
Agent.get(pid, &Map.get(&1, :host))
end
def getPort(pid) do
Agent.get(pid, &Map.get(&1, :port))
end
def set(pid, n) do def set(pid, n) do
Agent.update(pid, &Map.update(&1, :count, 0, fn(_) -> n end)) Agent.update(pid, &Map.update(&1, :count, 0, fn(_) -> n end))
end end
@ -56,15 +46,8 @@ defmodule BitcoinStream.Mempool do
end end
def sync(pid) do def sync(pid) do
host = getHost(pid); IO.puts("Syncing mempool");
port = getPort(pid); with {:ok, %{"size" => pool_size}} <- RPC.request(:rpc, "getmempoolinfo", []) do
IO.puts("Syncing mempool with bitcoin node on #{host} port #{port}");
with { user, pw } <- rpc_creds(),
{:ok, rpc_request} <- Jason.encode(%{method: "getmempoolinfo", params: [], request_id: 0}),
{:ok, 200, _headers, body_ref} <- :hackney.request(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}], rpc_request, [basic_auth: { user, pw }]),
{:ok, body} <- :hackney.body(body_ref),
{:ok, %{"result" => info}} <- Jason.decode(body),
%{"size" => pool_size} <- info do
IO.puts("Synced pool: size = #{pool_size}"); IO.puts("Synced pool: size = #{pool_size}");
set(pid, pool_size) set(pid, pool_size)
else else
@ -79,32 +62,4 @@ defmodule BitcoinStream.Mempool do
end end
end end
defp rpc_creds() do
cookie_path = System.get_env("BITCOIN_RPC_COOKIE");
rpc_user = System.get_env("BITCOIN_RPC_USER");
rpc_pw = System.get_env("BITCOIN_RPC_PASS");
cond do
(rpc_user != nil && rpc_pw != nil)
-> { rpc_user, rpc_pw }
(cookie_path != nil)
->
IO.puts("loading bitcoin rpc cookie at #{cookie_path}");
with {:ok, cookie} <- File.read(cookie_path),
[ user, pw ] <- String.split(cookie, ":") do
{ user, pw }
else
{:error, reason} ->
IO.puts("Failed to load bitcoin rpc cookie");
IO.inspect(reason)
:error
err ->
IO.puts("Failed to load bitcoin rpc cookie: (unknown reason)");
IO.inspect(err);
:error
end
true ->
IO.puts("Missing bitcoin rpc credentials");
:error
end
end
end end

View File

@ -10,6 +10,7 @@ defmodule BitcoinStream.Server do
children = [ children = [
{ BitcoinStream.BlockData, [name: :block_data] }, { BitcoinStream.BlockData, [name: :block_data] },
{ BitcoinStream.RPC, [host: btc_host, port: rpc_port, name: :rpc] },
{ BitcoinStream.Mempool, [name: :mempool] }, { BitcoinStream.Mempool, [name: :mempool] },
BitcoinStream.Metrics.Probe, BitcoinStream.Metrics.Probe,
Plug.Cowboy.child_spec( Plug.Cowboy.child_spec(