mirror of
https://github.com/Retropex/bitfeed.git
synced 2025-05-13 03:30:47 +02:00
Refactor API server supervision tree
- Migrate mempool agent to a GenServer - ZMQ sockets handled by separate GenServers - Mempool & socket servers under one_for_all supervisor - Improve error handling
This commit is contained in:
parent
7a0c98f7f1
commit
bd9d11818d
@ -10,39 +10,77 @@ defmodule BitcoinStream.RPC do
|
||||
{port, opts} = Keyword.pop(opts, :port);
|
||||
{host, opts} = Keyword.pop(opts, :host);
|
||||
IO.puts("Starting Bitcoin RPC server on #{host} port #{port}")
|
||||
GenServer.start_link(__MODULE__, {host, port, nil, nil}, opts)
|
||||
GenServer.start_link(__MODULE__, {host, port, nil, nil, []}, opts)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({host, port, status, _}) do
|
||||
def init({host, port, status, _, listeners}) do
|
||||
# start node monitoring loop
|
||||
creds = rpc_creds();
|
||||
|
||||
send(self(), :check_status);
|
||||
{:ok, {host, port, status, creds}}
|
||||
{:ok, {host, port, status, creds, listeners}}
|
||||
end
|
||||
|
||||
def handle_info(:check_status, state) do
|
||||
# Do the desired work here
|
||||
state = check_status(state)
|
||||
Process.send_after(self(), :check_status, 60 * 1000)
|
||||
{:noreply, state}
|
||||
defp notify_listeners([]) do
|
||||
true
|
||||
end
|
||||
defp notify_listeners([head | tail]) do
|
||||
GenServer.reply(head, :mempool_synced);
|
||||
notify_listeners(tail)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:request, method, params}, _from, {host, port, status, creds}) do
|
||||
def handle_info(:check_status, {host, port, status, creds, listeners}) do
|
||||
# poll Bitcoin Core for current status
|
||||
status = check_status({host, port, creds});
|
||||
case status do
|
||||
# if node is connected and finished with the initial block download
|
||||
{:ok, %{"initialblockdownload" => false}} ->
|
||||
IO.puts("Bitcoin Core connected and fully synced");
|
||||
# notify all listening processes
|
||||
notify_listeners(listeners);
|
||||
Process.send_after(self(), :check_status, 60 * 1000);
|
||||
{:noreply, {host, port, status, creds, []}}
|
||||
|
||||
{:ok, %{"initialblockdownload" => true}} ->
|
||||
IO.puts("Bitcoin Core connected, waiting for initial block download");
|
||||
Process.send_after(self(), :check_status, 60 * 1000);
|
||||
{:noreply, {host, port, status, creds, listeners}}
|
||||
|
||||
_ ->
|
||||
IO.puts("Waiting to connect to Bitcoin Core");
|
||||
Process.send_after(self(), :check_status, 60 * 1000);
|
||||
{:noreply, {host, port, status, creds, listeners}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:request, method, params}, _from, {host, port, status, creds, listeners}) do
|
||||
case make_request(host, port, creds, method, params) do
|
||||
{:ok, code, info} ->
|
||||
{:reply, {:ok, code, info}, {host, port, status, creds}}
|
||||
{:reply, {:ok, code, info}, {host, port, status, creds, listeners}}
|
||||
|
||||
{:error, reason} ->
|
||||
{:reply, {:error, reason}, {host, port, status, creds}}
|
||||
{:reply, {:error, reason}, {host, port, status, creds, listeners}}
|
||||
|
||||
error ->
|
||||
{:reply, error, {host, port, status, creds, listeners}}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:get_node_status}, _from, {host, port, status, creds}) do
|
||||
{:reply, {:ok, status}, {host, port, status, creds}}
|
||||
def handle_call(:get_node_status, _from, {host, port, status, creds, listeners}) do
|
||||
{:reply, status, {host, port, status, creds, listeners}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners}) do
|
||||
{:noreply, {host, port, status, creds, [from | listeners]}}
|
||||
end
|
||||
|
||||
def notify_on_ready(pid) do
|
||||
GenServer.call(pid, :notify_on_ready, :infinity)
|
||||
end
|
||||
|
||||
defp make_request(host, port, creds, method, params) do
|
||||
@ -68,29 +106,29 @@ defmodule BitcoinStream.RPC do
|
||||
end
|
||||
|
||||
def request(pid, method, params) do
|
||||
GenServer.call(pid, {:request, method, params}, 60000)
|
||||
GenServer.call(pid, {:request, method, params}, 30000)
|
||||
catch
|
||||
:exit, reason ->
|
||||
IO.puts("RPC request #{method} failed - probably timed out?")
|
||||
IO.inspect(reason)
|
||||
case reason do
|
||||
{:timeout, _} -> {:error, :timeout}
|
||||
|
||||
_ -> {:error, reason}
|
||||
end
|
||||
|
||||
error -> {:error, error}
|
||||
end
|
||||
|
||||
def get_node_status(pid) do
|
||||
GenServer.call(pid, {:get_node_status})
|
||||
GenServer.call(pid, :get_node_status, 10000)
|
||||
end
|
||||
|
||||
def check_status({host, port, status, creds}) do
|
||||
with {:ok, 200, info} <- make_request(host, port, creds, "getblockchaininfo", []) do
|
||||
{host, port, info, creds}
|
||||
else
|
||||
{:error, reason} ->
|
||||
IO.puts("node status check failed");
|
||||
IO.inspect(reason)
|
||||
{host, port, status, creds}
|
||||
err ->
|
||||
IO.puts("node status check failed: (unknown reason)");
|
||||
IO.inspect(err);
|
||||
{host, port, status, creds}
|
||||
defp check_status({host, port, creds}) do
|
||||
case make_request(host, port, creds, "getblockchaininfo", []) do
|
||||
{:ok, 200, info} -> {:ok, info}
|
||||
|
||||
{:ok, code, info} -> {:error, code, info}
|
||||
|
||||
_ -> {:error}
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -8,8 +8,6 @@ defmodule BitcoinStream.BlockData do
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Block, as: BitcoinBlock
|
||||
|
||||
def start_link(opts) do
|
||||
IO.puts("Starting block data link")
|
||||
# load block
|
||||
@ -38,7 +36,19 @@ defmodule BitcoinStream.BlockData do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:json, {id, json}}, _state) do
|
||||
{:noreply, {id, json}}
|
||||
def handle_call({:json, {id, json}}, _from, _state) do
|
||||
{:reply, :ok, {id, json}}
|
||||
end
|
||||
|
||||
def get_json_block(pid) do
|
||||
GenServer.call(pid, :json_block, 10000)
|
||||
end
|
||||
|
||||
def get_block_id(pid) do
|
||||
GenServer.call(pid, :block_id, 10000)
|
||||
end
|
||||
|
||||
def set_json_block(pid, block_id, json) do
|
||||
GenServer.call(pid, {:json, { block_id, json }}, 10000)
|
||||
end
|
||||
end
|
||||
|
@ -1,243 +0,0 @@
|
||||
defmodule BitcoinStream.Bridge do
|
||||
@moduledoc """
|
||||
Bitcoin event bridge module.
|
||||
|
||||
Consumes a source of ZMQ bitcoin tx and block events
|
||||
and forwards to connected websocket clients
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Block, as: BitcoinBlock
|
||||
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
|
||||
def child_spec(host: host, tx_port: tx_port, block_port: block_port, sequence_port: sequence_port) do
|
||||
%{
|
||||
id: BitcoinStream.Bridge,
|
||||
start: {BitcoinStream.Bridge, :start_link, [host, tx_port, block_port, sequence_port]}
|
||||
}
|
||||
end
|
||||
|
||||
def start_link(host, tx_port, block_port, sequence_port) do
|
||||
IO.puts("Starting Bitcoin bridge on #{host} ports #{tx_port}, #{block_port}, #{sequence_port}")
|
||||
IO.puts("Mempool loaded, ready to syncronize");
|
||||
Task.start(fn -> connect_tx(host, tx_port) end);
|
||||
Task.start(fn -> connect_block(host, block_port) end);
|
||||
Task.start(fn -> connect_sequence(host, sequence_port) end);
|
||||
:timer.sleep(2000);
|
||||
Task.start(fn -> Mempool.sync(:mempool) end);
|
||||
GenServer.start_link(__MODULE__, %{})
|
||||
end
|
||||
|
||||
def init(arg) do
|
||||
{:ok, arg}
|
||||
end
|
||||
|
||||
defp connect_tx(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to tx socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to tx socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected tx zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'rawtx')
|
||||
IO.puts("Subscribed to rawtx events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to tx socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding tx socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start tx loop
|
||||
tx_loop(socket, 0)
|
||||
end
|
||||
|
||||
defp connect_block(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to block socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to block socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected block zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'rawblock')
|
||||
IO.puts("Subscribed to rawblock events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to block socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding block socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start block loop
|
||||
block_loop(socket, 0)
|
||||
end
|
||||
|
||||
defp connect_sequence(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to sequence socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to sequence socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected sequence zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'sequence')
|
||||
IO.puts("Subscribed to sequence events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to sequence socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding sequence socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start tx loop
|
||||
sequence_loop(socket)
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
_ ->
|
||||
Process.sleep(5000);
|
||||
wait_for_ibd()
|
||||
end
|
||||
end
|
||||
|
||||
defp send_txn(txn, count) do
|
||||
# IO.puts("Forwarding transaction to websocket clients")
|
||||
case Jason.encode(%{type: "txn", txn: txn, count: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
Process.send(pid, payload, [])
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding transaction: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp send_block(block, count) do
|
||||
case Jason.encode(%{type: "block", block: %{id: block.id}, drop: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
IO.puts("Forwarding to pid #{inspect pid}")
|
||||
Process.send(pid, payload, []);
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding block: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp send_drop_tx(txid, count) do
|
||||
case Jason.encode(%{type: "drop", txid: txid, count: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
Process.send(pid, payload, []);
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding drop message: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp tx_process(payload) do
|
||||
case BitcoinTx.decode(payload) do
|
||||
{:ok, txn} ->
|
||||
case Mempool.get_tx_status(:mempool, txn.id) do
|
||||
# :registered and :new transactions are inflated and inserted into the mempool
|
||||
status when (status in [:registered, :new]) ->
|
||||
inflated_txn = BitcoinTx.inflate(txn);
|
||||
case Mempool.insert(:mempool, txn.id, inflated_txn) do
|
||||
# Mempool.insert returns the size of the mempool if insertion was successful
|
||||
# Forward tx to clients in this case
|
||||
count when is_integer(count) -> send_txn(inflated_txn, count)
|
||||
|
||||
_ -> false
|
||||
end
|
||||
|
||||
# other statuses indicate duplicate or dropped transaction
|
||||
_ -> false
|
||||
end
|
||||
|
||||
{:err, reason} ->
|
||||
IO.puts("Error decoding tx: #{reason}");
|
||||
false
|
||||
|
||||
error ->
|
||||
IO.puts("Error decoding tx");
|
||||
IO.inspect(error);
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp tx_loop(socket, seq) do
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket),
|
||||
[_topic, payload, <<sequence::little-size(32)>>] <- message,
|
||||
true <- (seq != sequence) do
|
||||
Task.start(fn -> tx_process(payload) end);
|
||||
tx_loop(socket, sequence)
|
||||
else
|
||||
_ -> tx_loop(socket, seq)
|
||||
end
|
||||
end
|
||||
|
||||
defp block_loop(socket, seq) do
|
||||
IO.puts("client block loop");
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket), # wait for the next zmq message in the queue
|
||||
[_topic, payload, <<sequence::little-size(32)>>] <- message,
|
||||
true <- (seq != sequence), # discard contiguous duplicate messages
|
||||
_ <- IO.puts("block received"),
|
||||
{:ok, block} <- BitcoinBlock.decode(payload),
|
||||
count <- Mempool.clear_block_txs(:mempool, block),
|
||||
{:ok, json} <- Jason.encode(block),
|
||||
:ok <- File.write("data/last_block.json", json) do
|
||||
IO.puts("processed block #{block.id}");
|
||||
GenServer.cast(:block_data, {:json, { block.id, json }});
|
||||
send_block(block, count);
|
||||
block_loop(socket, sequence)
|
||||
else
|
||||
_ -> block_loop(socket, seq)
|
||||
end
|
||||
end
|
||||
|
||||
defp sequence_loop(socket) do
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket),
|
||||
[_topic, <<hash::binary-size(32), type::binary-size(1), seq::little-size(64)>>, <<_sequence::little-size(32)>>] <- message,
|
||||
txid <- Base.encode16(hash, case: :lower),
|
||||
event <- to_charlist(type) do
|
||||
# IO.puts("loop #{sequence}");
|
||||
case event do
|
||||
# Transaction added to mempool
|
||||
'A' ->
|
||||
case Mempool.register(:mempool, txid, seq, true) do
|
||||
false -> false
|
||||
|
||||
{ txn, count } ->
|
||||
# IO.puts("*SEQ* #{txid}");
|
||||
send_txn(txn, count)
|
||||
end
|
||||
|
||||
# Transaction removed from mempool for non block inclusion reason
|
||||
'R' ->
|
||||
case Mempool.drop(:mempool, txid) do
|
||||
count when is_integer(count) ->
|
||||
send_drop_tx(txid, count);
|
||||
|
||||
_ ->
|
||||
true
|
||||
end
|
||||
|
||||
# Don't care about other events
|
||||
_ -> true
|
||||
end
|
||||
else
|
||||
_ -> false
|
||||
end
|
||||
sequence_loop(socket)
|
||||
end
|
||||
|
||||
end
|
100
server/lib/bridge_block.ex
Normal file
100
server/lib/bridge_block.ex
Normal file
@ -0,0 +1,100 @@
|
||||
defmodule BitcoinStream.Bridge.Block do
|
||||
@moduledoc """
|
||||
Bitcoin event bridge module.
|
||||
|
||||
Consumes a source of ZMQ bitcoin tx events
|
||||
and forwards to connected websocket clients
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Block, as: BitcoinBlock
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
alias BitcoinStream.BlockData, as: BlockData
|
||||
|
||||
def child_spec(host: host, port: port) do
|
||||
%{
|
||||
id: BitcoinStream.Bridge.Block,
|
||||
start: {BitcoinStream.Bridge.Block, :start_link, [host, port]}
|
||||
}
|
||||
end
|
||||
|
||||
def start_link(host, port) do
|
||||
IO.puts("Starting Bitcoin Block bridge on #{host} port #{port}")
|
||||
GenServer.start_link(__MODULE__, {host, port})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({host, port}) do
|
||||
{:ok, {host, port}, {:continue, :connect}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_continue(:connect, {host, port}) do
|
||||
connect(host, port);
|
||||
{:noreply, {host, port}}
|
||||
end
|
||||
|
||||
defp connect(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to block socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to block socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected block zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'rawblock')
|
||||
IO.puts("Subscribed to rawblock events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to block socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding block socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start block loop
|
||||
loop(socket, 0)
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
|
||||
_ ->
|
||||
RPC.notify_on_ready(:rpc)
|
||||
end
|
||||
end
|
||||
|
||||
defp send_block(block, count) do
|
||||
case Jason.encode(%{type: "block", block: %{id: block.id}, drop: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
IO.puts("Forwarding to pid #{inspect pid}")
|
||||
Process.send(pid, payload, []);
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding block: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp loop(socket, seq) do
|
||||
IO.puts("client block loop");
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket), # wait for the next zmq message in the queue
|
||||
[_topic, payload, <<sequence::little-size(32)>>] <- message,
|
||||
true <- (seq != sequence), # discard contiguous duplicate messages
|
||||
_ <- IO.puts("block received"),
|
||||
{:ok, block} <- BitcoinBlock.decode(payload),
|
||||
count <- Mempool.clear_block_txs(:mempool, block),
|
||||
{:ok, json} <- Jason.encode(block),
|
||||
:ok <- File.write("data/last_block.json", json) do
|
||||
IO.puts("processed block #{block.id}");
|
||||
BlockData.set_json_block(:block_data, block.id, json);
|
||||
send_block(block, count);
|
||||
loop(socket, sequence)
|
||||
else
|
||||
_ -> loop(socket, seq)
|
||||
end
|
||||
end
|
||||
|
||||
end
|
128
server/lib/bridge_sequence.ex
Normal file
128
server/lib/bridge_sequence.ex
Normal file
@ -0,0 +1,128 @@
|
||||
defmodule BitcoinStream.Bridge.Sequence do
|
||||
@moduledoc """
|
||||
Bitcoin event bridge module.
|
||||
|
||||
Consumes a source of ZMQ bitcoin tx events
|
||||
and forwards to connected websocket clients
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
|
||||
def child_spec(host: host, port: port) do
|
||||
%{
|
||||
id: BitcoinStream.Bridge.Sequence,
|
||||
start: {BitcoinStream.Bridge.Sequence, :start_link, [host, port]}
|
||||
}
|
||||
end
|
||||
|
||||
def start_link(host, port) do
|
||||
IO.puts("Starting Bitcoin Sequence bridge on #{host} port #{port}")
|
||||
GenServer.start_link(__MODULE__, {host, port})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({host, port}) do
|
||||
{:ok, {host, port}, {:continue, :connect}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_continue(:connect, {host, port}) do
|
||||
connect(host, port);
|
||||
{:noreply, {host, port}}
|
||||
end
|
||||
|
||||
defp connect(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to sequence socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to sequence socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected sequence zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'sequence')
|
||||
IO.puts("Subscribed to sequence events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to sequence socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding sequence socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start tx loop
|
||||
loop(socket)
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
|
||||
_ ->
|
||||
RPC.notify_on_ready(:rpc)
|
||||
end
|
||||
end
|
||||
|
||||
defp send_txn(txn, count) do
|
||||
# IO.puts("Forwarding transaction to websocket clients")
|
||||
case Jason.encode(%{type: "txn", txn: txn, count: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
Process.send(pid, payload, [])
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding transaction: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp send_drop_tx(txid, count) do
|
||||
case Jason.encode(%{type: "drop", txid: txid, count: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
Process.send(pid, payload, []);
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding drop message: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp loop(socket) do
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket),
|
||||
[_topic, <<hash::binary-size(32), type::binary-size(1), seq::little-size(64)>>, <<_sequence::little-size(32)>>] <- message,
|
||||
txid <- Base.encode16(hash, case: :lower),
|
||||
event <- to_charlist(type) do
|
||||
# IO.puts("loop #{sequence}");
|
||||
case event do
|
||||
# Transaction added to mempool
|
||||
'A' ->
|
||||
case Mempool.register(:mempool, txid, seq, true) do
|
||||
false -> false
|
||||
|
||||
{ txn, count } ->
|
||||
# IO.puts("*SEQ* #{txid}");
|
||||
send_txn(txn, count)
|
||||
end
|
||||
|
||||
# Transaction removed from mempool for non block inclusion reason
|
||||
'R' ->
|
||||
case Mempool.drop(:mempool, txid) do
|
||||
count when is_integer(count) ->
|
||||
send_drop_tx(txid, count);
|
||||
|
||||
_ ->
|
||||
true
|
||||
end
|
||||
|
||||
# Don't care about other events
|
||||
_ -> true
|
||||
end
|
||||
else
|
||||
err -> IO.inspect(err)
|
||||
_ -> false
|
||||
end
|
||||
loop(socket)
|
||||
end
|
||||
|
||||
end
|
121
server/lib/bridge_tx.ex
Normal file
121
server/lib/bridge_tx.ex
Normal file
@ -0,0 +1,121 @@
|
||||
defmodule BitcoinStream.Bridge.Tx do
|
||||
@moduledoc """
|
||||
Bitcoin event bridge module.
|
||||
|
||||
Consumes a source of ZMQ bitcoin tx events
|
||||
and forwards to connected websocket clients
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
|
||||
def child_spec(host: host, port: port) do
|
||||
%{
|
||||
id: BitcoinStream.Bridge.Tx,
|
||||
start: {BitcoinStream.Bridge.Tx, :start_link, [host, port]}
|
||||
}
|
||||
end
|
||||
|
||||
def start_link(host, port) do
|
||||
IO.puts("Starting Bitcoin Tx bridge on #{host} port #{port}")
|
||||
GenServer.start_link(__MODULE__, {host, port})
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init({host, port}) do
|
||||
{:ok, {host, port}, {:continue, :connect}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_continue(:connect, {host, port}) do
|
||||
connect(host, port);
|
||||
{:noreply, {host, port}}
|
||||
end
|
||||
|
||||
defp connect(host, port) do
|
||||
# check rpc online & synced
|
||||
IO.puts("Waiting for node to come online and fully sync before connecting to tx socket");
|
||||
wait_for_ibd();
|
||||
IO.puts("Node is fully synced, connecting to tx socket");
|
||||
|
||||
# connect to socket
|
||||
{:ok, socket} = :chumak.socket(:sub);
|
||||
IO.puts("Connected tx zmq socket on #{host} port #{port}");
|
||||
:chumak.subscribe(socket, 'rawtx')
|
||||
IO.puts("Subscribed to rawtx events")
|
||||
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
|
||||
{:ok, pid} -> IO.puts("Binding ok to tx socket pid #{inspect pid}");
|
||||
{:error, reason} -> IO.puts("Binding tx socket failed: #{reason}");
|
||||
_ -> IO.puts("???");
|
||||
end
|
||||
|
||||
# start tx loop
|
||||
loop(socket, 0)
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
{:ok, %{"initialblockdownload" => false}} -> true
|
||||
|
||||
_ ->
|
||||
RPC.notify_on_ready(:rpc)
|
||||
end
|
||||
end
|
||||
|
||||
defp send_txn(txn, count) do
|
||||
# IO.puts("Forwarding transaction to websocket clients")
|
||||
case Jason.encode(%{type: "txn", txn: txn, count: count}) do
|
||||
{:ok, payload} ->
|
||||
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
|
||||
for {pid, _} <- entries do
|
||||
Process.send(pid, payload, [])
|
||||
end
|
||||
end)
|
||||
{:error, reason} -> IO.puts("Error json encoding transaction: #{reason}");
|
||||
end
|
||||
end
|
||||
|
||||
defp process(payload) do
|
||||
case BitcoinTx.decode(payload) do
|
||||
{:ok, txn} ->
|
||||
case Mempool.get_tx_status(:mempool, txn.id) do
|
||||
# :registered and :new transactions are inflated and inserted into the mempool
|
||||
status when (status in [:registered, :new]) ->
|
||||
inflated_txn = BitcoinTx.inflate(txn);
|
||||
case Mempool.insert(:mempool, txn.id, inflated_txn) do
|
||||
# Mempool.insert returns the size of the mempool if insertion was successful
|
||||
# Forward tx to clients in this case
|
||||
count when is_integer(count) -> send_txn(inflated_txn, count)
|
||||
|
||||
_ -> false
|
||||
end
|
||||
|
||||
# other statuses indicate duplicate or dropped transaction
|
||||
_ -> false
|
||||
end
|
||||
|
||||
{:err, reason} ->
|
||||
IO.puts("Error decoding tx: #{reason}");
|
||||
false
|
||||
|
||||
error ->
|
||||
IO.puts("Error decoding tx");
|
||||
IO.inspect(error);
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp loop(socket, seq) do
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket),
|
||||
[_topic, payload, <<sequence::little-size(32)>>] <- message,
|
||||
true <- (seq != sequence) do
|
||||
Task.start(fn -> process(payload) end);
|
||||
loop(socket, sequence)
|
||||
else
|
||||
_ -> loop(socket, seq)
|
||||
end
|
||||
end
|
||||
|
||||
end
|
@ -1,6 +1,6 @@
|
||||
defmodule BitcoinStream.Mempool do
|
||||
@moduledoc """
|
||||
Agent for retrieving and maintaining mempool info
|
||||
GenServer for retrieving and maintaining mempool info
|
||||
Used for tracking mempool count, and maintaining an :ets cache of transaction prevouts
|
||||
|
||||
Transaction lifecycle:
|
||||
@ -12,7 +12,7 @@ defmodule BitcoinStream.Mempool do
|
||||
ZMQ 'A' and 'R' messages are guaranteed to arrive in order relative to each other
|
||||
but rawtx and rawblock messages may arrive in any order
|
||||
"""
|
||||
use Agent
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
@ -22,7 +22,7 @@ defmodule BitcoinStream.Mempool do
|
||||
connecting to a bitcoin node at RPC `host:port` for ground truth data
|
||||
"""
|
||||
def start_link(opts) do
|
||||
IO.puts("Starting mempool agent");
|
||||
IO.puts("Starting mempool tracker");
|
||||
# cache of all transactions in the node mempool, mapped to {inputs, total_input_value}
|
||||
:ets.new(:mempool_cache, [:set, :public, :named_table]);
|
||||
# cache of transactions ids in the mempool, but not yet synchronized with the :mempool_cache
|
||||
@ -30,50 +30,113 @@ defmodule BitcoinStream.Mempool do
|
||||
# cache of transaction ids included in the last block
|
||||
# used to avoid allowing confirmed transactions back into the mempool if rawtx events arrive late
|
||||
:ets.new(:block_cache, [:set, :public, :named_table]);
|
||||
Agent.start_link(fn -> %{count: 0, seq: :infinity, queue: [], done: false} end, opts)
|
||||
|
||||
# state: {count, sequence_number, queue, done}
|
||||
GenServer.start_link(__MODULE__, {0, :infinity, [], false}, opts)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(state) do
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_count, _from, {count, seq, queue, done}) do
|
||||
{:reply, count, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:set_count, n}, _from, {_count, seq, queue, done}) do
|
||||
{:reply, :ok, {n, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:increment_count, _from, {count, seq, queue, done}) do
|
||||
{:reply, :ok, {count + 1, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:decrement_count, _from, {count, seq, queue, done}) do
|
||||
{:reply, :ok, {count - 1, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_seq, _from, {count, seq, queue, done}) do
|
||||
{:reply, seq, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:set_seq, seq}, _from, {count, _seq, queue, done}) do
|
||||
{:reply, :ok, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_queue, _from, {count, seq, queue, done}) do
|
||||
{:reply, queue, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:set_queue, queue}, _from, {count, seq, _queue, done}) do
|
||||
{:reply, :ok, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:enqueue, txid}, _from, {count, seq, queue, done}) do
|
||||
{:reply, :ok, {count, seq, [txid | queue], done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:is_done, _from, {count, seq, queue, done}) do
|
||||
{:reply, done, {count, seq, queue, done}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:set_done, _from, {count, seq, queue, _done}) do
|
||||
{:reply, :ok, {count, seq, queue, true}}
|
||||
end
|
||||
|
||||
def set(pid, n) do
|
||||
Agent.update(pid, &Map.update(&1, :count, 0, fn(_) -> n end))
|
||||
GenServer.call(pid, {:set_count, n})
|
||||
end
|
||||
|
||||
def get(pid) do
|
||||
Agent.get(pid, &Map.get(&1, :count))
|
||||
GenServer.call(pid, :get_count)
|
||||
end
|
||||
|
||||
defp increment(pid) do
|
||||
Agent.update(pid, &Map.update(&1, :count, 0, fn(x) -> x + 1 end));
|
||||
GenServer.call(pid, :increment_count)
|
||||
end
|
||||
|
||||
defp decrement(pid) do
|
||||
Agent.update(pid, &Map.update(&1, :count, 0, fn(x) -> x - 1 end));
|
||||
GenServer.call(pid, :decrement_count)
|
||||
end
|
||||
|
||||
defp get_seq(pid) do
|
||||
Agent.get(pid, &Map.get(&1, :seq))
|
||||
GenServer.call(pid, :get_seq)
|
||||
end
|
||||
|
||||
defp set_seq(pid, seq) do
|
||||
Agent.update(pid, &Map.put(&1, :seq, seq))
|
||||
GenServer.call(pid, {:set_seq, seq})
|
||||
end
|
||||
|
||||
defp get_queue(pid) do
|
||||
Agent.get(pid, &Map.get(&1, :queue))
|
||||
GenServer.call(pid, :get_queue)
|
||||
end
|
||||
|
||||
defp set_queue(pid, queue) do
|
||||
Agent.update(pid, &Map.put(&1, :queue, queue))
|
||||
GenServer.call(pid, {:set_queue, queue})
|
||||
end
|
||||
|
||||
defp enqueue(pid, txid) do
|
||||
Agent.update(pid, &Map.update(&1, :queue, [], fn(q) -> [txid | q] end))
|
||||
GenServer.call(pid, {:enqueue, txid})
|
||||
end
|
||||
|
||||
def is_done(pid) do
|
||||
Agent.get(pid, &Map.get(&1, :done))
|
||||
GenServer.call(pid, :is_done)
|
||||
end
|
||||
|
||||
defp set_done(pid) do
|
||||
Agent.update(pid, &Map.put(&1, :done, true))
|
||||
GenServer.call(pid, :set_done)
|
||||
end
|
||||
|
||||
def get_tx_status(_pid, txid) do
|
||||
@ -257,8 +320,12 @@ defmodule BitcoinStream.Mempool do
|
||||
|
||||
IO.puts("Loaded #{count} mempool transactions");
|
||||
send_mempool_count(pid);
|
||||
do_sync(pid, txns)
|
||||
do_sync(pid, txns);
|
||||
:ok
|
||||
else
|
||||
{:error, :timeout} ->
|
||||
IO.puts("Pool sync timed out");
|
||||
|
||||
{:error, reason} ->
|
||||
IO.puts("Pool sync failed");
|
||||
IO.inspect(reason)
|
||||
@ -275,7 +342,7 @@ defmodule BitcoinStream.Mempool do
|
||||
sync_mempool(pid, txns);
|
||||
IO.puts("MEMPOOL SYNC FINISHED");
|
||||
set_done(pid);
|
||||
IO.inspect(is_done(pid))
|
||||
:ok
|
||||
end
|
||||
|
||||
defp sync_mempool(pid, txns) do
|
||||
|
13
server/lib/mempool_sync.ex
Normal file
13
server/lib/mempool_sync.ex
Normal file
@ -0,0 +1,13 @@
|
||||
defmodule BitcoinStream.Mempool.Sync do
|
||||
use Task, restart: :transient
|
||||
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
|
||||
def start_link(args) do
|
||||
Task.start_link(__MODULE__, :run, args)
|
||||
end
|
||||
|
||||
def run(_args) do
|
||||
Mempool.sync(:mempool)
|
||||
end
|
||||
end
|
@ -1,6 +1,8 @@
|
||||
defmodule BitcoinStream.Router do
|
||||
use Plug.Router
|
||||
|
||||
alias BitcoinStream.BlockData, as: BlockData
|
||||
|
||||
plug Corsica, origins: "*", allow_headers: :all
|
||||
plug Plug.Static,
|
||||
at: "/",
|
||||
@ -27,10 +29,10 @@ defmodule BitcoinStream.Router do
|
||||
end
|
||||
|
||||
defp get_block(last_seen) do
|
||||
last_id = GenServer.call(:block_data, :block_id);
|
||||
last_id = BlockData.get_block_id(:block_data);
|
||||
cond do
|
||||
(last_seen == last_id) ->
|
||||
payload = GenServer.call(:block_data, :json_block);
|
||||
payload = BlockData.get_json_block(:block_data);
|
||||
{:ok, payload}
|
||||
true -> :err
|
||||
end
|
||||
|
@ -10,8 +10,11 @@ defmodule BitcoinStream.Server do
|
||||
btc_host = System.get_env("BITCOIN_HOST");
|
||||
|
||||
children = [
|
||||
Registry.child_spec(
|
||||
keys: :duplicate,
|
||||
name: Registry.BitcoinStream
|
||||
),
|
||||
{ BitcoinStream.RPC, [host: btc_host, port: rpc_port, name: :rpc] },
|
||||
{ BitcoinStream.Mempool, [name: :mempool] },
|
||||
{ BitcoinStream.BlockData, [name: :block_data] },
|
||||
BitcoinStream.Metrics.Probe,
|
||||
Plug.Cowboy.child_spec(
|
||||
@ -22,11 +25,21 @@ defmodule BitcoinStream.Server do
|
||||
port: socket_port
|
||||
]
|
||||
),
|
||||
Registry.child_spec(
|
||||
keys: :duplicate,
|
||||
name: Registry.BitcoinStream
|
||||
),
|
||||
BitcoinStream.Bridge.child_spec(host: btc_host, tx_port: zmq_tx_port, block_port: zmq_block_port, sequence_port: zmq_sequence_port)
|
||||
%{
|
||||
id: BitcoinStream.Bridge,
|
||||
start: {Supervisor, :start_link, [
|
||||
[
|
||||
{ BitcoinStream.Mempool, [name: :mempool] },
|
||||
{ BitcoinStream.Mempool.Sync, [name: :mempool_sync] },
|
||||
BitcoinStream.Bridge.Tx.child_spec(host: btc_host, port: zmq_tx_port),
|
||||
BitcoinStream.Bridge.Block.child_spec(host: btc_host, port: zmq_block_port),
|
||||
BitcoinStream.Bridge.Sequence.child_spec(host: btc_host, port: zmq_sequence_port),
|
||||
],
|
||||
[strategy: :one_for_all]
|
||||
]},
|
||||
type: :supervisor,
|
||||
restart: :permanent
|
||||
}
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_one, name: BitcoinStream.Application]
|
||||
|
@ -20,14 +20,14 @@ defmodule BitcoinStream.SocketHandler do
|
||||
|
||||
def get_block(last_seen) do
|
||||
IO.puts("getting block with id #{last_seen}")
|
||||
last_id = GenServer.call(:block_data, :block_id)
|
||||
last_id = BlockData.get_block_id(:block_data)
|
||||
IO.puts("last block id: #{last_id}")
|
||||
cond do
|
||||
(last_seen == nil) ->
|
||||
payload = GenServer.call(:block_data, :json_block);
|
||||
payload = BlockData.get_json_block(:block_data);
|
||||
{:ok, payload}
|
||||
(last_seen != last_id) ->
|
||||
payload = GenServer.call(:block_data, :json_block);
|
||||
payload = BlockData.get_json_block(:block_data);
|
||||
{:ok, payload}
|
||||
true ->
|
||||
{:ok, '{"type": "block", "block": {}}'}
|
||||
@ -40,7 +40,7 @@ defmodule BitcoinStream.SocketHandler do
|
||||
end
|
||||
|
||||
def get_block_id_msg() do
|
||||
last_id = GenServer.call(:block_data, :block_id);
|
||||
last_id = BlockData.get_block_id(:block_data);
|
||||
"{ \"type\": \"block_id\", \"block_id\": \"#{last_id}\"}"
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user