Use Logger

This commit is contained in:
Mononaut 2022-03-03 18:13:46 -06:00
parent 0c48d0b233
commit 48a3204055
13 changed files with 132 additions and 97 deletions

View File

@ -17,6 +17,7 @@ RUN mix do deps.compile
COPY lib ./lib COPY lib ./lib
COPY log ./log COPY log ./log
COPY config ./config
ENV MIX_ENV prod ENV MIX_ENV prod
ENV RELEASE_NODE bitfeed ENV RELEASE_NODE bitfeed

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.RPC do defmodule BitcoinStream.RPC do
@moduledoc """ @moduledoc """
GenServer for bitcoin rpc requests GenServer for bitcoin rpc requests
@ -9,7 +11,7 @@ defmodule BitcoinStream.RPC do
def start_link(opts) do def start_link(opts) do
{port, opts} = Keyword.pop(opts, :port); {port, opts} = Keyword.pop(opts, :port);
{host, opts} = Keyword.pop(opts, :host); {host, opts} = Keyword.pop(opts, :host);
IO.puts("Starting Bitcoin RPC server on #{host} port #{port}") Logger.info("Starting Bitcoin RPC server on #{host} port #{port}")
GenServer.start_link(__MODULE__, {host, port, nil, nil, [], %{}}, opts) GenServer.start_link(__MODULE__, {host, port, nil, nil, [], %{}}, opts)
end end
@ -37,7 +39,7 @@ defmodule BitcoinStream.RPC do
{:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, :status)}} {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, :status)}}
:error -> :error ->
IO.puts("Waiting to connect to Bitcoin Core"); Logger.info("Waiting to connect to Bitcoin Core");
Process.send_after(self(), :check_status, 10 * 1000); Process.send_after(self(), :check_status, 10 * 1000);
{:noreply, {host, port, status, creds, listeners, inflight}} {:noreply, {host, port, status, creds, listeners, inflight}}
end end
@ -45,13 +47,15 @@ defmodule BitcoinStream.RPC do
@impl true @impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {host, port, status, creds, listeners, inflight}) do def handle_info({:DOWN, ref, :process, _pid, _reason}, {host, port, status, creds, listeners, inflight}) do
# IO.puts("DOWN: #{inspect pid} #{inspect reason}")
{_, inflight} = Map.pop(inflight, ref); {_, inflight} = Map.pop(inflight, ref);
{:noreply, {host, port, status, creds, listeners, inflight}} {:noreply, {host, port, status, creds, listeners, inflight}}
end end
@impl true @impl true
def handle_info({ref, result}, {host, port, status, creds, listeners, inflight}) do def handle_info({ref, result}, {host, port, status, creds, listeners, inflight}) do
if Enum.count(inflight) > 2 do
Logger.debug("#{Enum.count(inflight)} rpc requests inflight");
end
case Map.pop(inflight, ref) do case Map.pop(inflight, ref) do
{nil, inflight} -> {nil, inflight} ->
{:noreply, {host, port, status, creds, listeners, inflight}} {:noreply, {host, port, status, creds, listeners, inflight}}
@ -61,18 +65,18 @@ defmodule BitcoinStream.RPC do
# if node is connected and finished with the initial block download # if node is connected and finished with the initial block download
{:ok, 200, %{"initialblockdownload" => false}} -> {:ok, 200, %{"initialblockdownload" => false}} ->
# notify all listening processes # notify all listening processes
IO.puts("Bitcoin Core connected and synced"); Logger.info("Bitcoin Core connected and synced");
notify_listeners(listeners); notify_listeners(listeners);
Process.send_after(self(), :check_status, 300 * 1000); Process.send_after(self(), :check_status, 300 * 1000);
{:noreply, {host, port, :ok, creds, [], inflight}} {:noreply, {host, port, :ok, creds, [], inflight}}
{:ok, 200, %{"initialblockdownload" => true}} -> {:ok, 200, %{"initialblockdownload" => true}} ->
IO.puts("Bitcoin Core connected, waiting for initial block download"); Logger.info("Bitcoin Core connected, waiting for initial block download");
Process.send_after(self(), :check_status, 30 * 1000); Process.send_after(self(), :check_status, 30 * 1000);
{:noreply, {host, port, :ibd, creds, listeners, inflight}} {:noreply, {host, port, :ibd, creds, listeners, inflight}}
_ -> _ ->
IO.puts("Waiting to connect to Bitcoin Core"); Logger.info("Waiting to connect to Bitcoin Core");
Process.send_after(self(), :check_status, 10 * 1000); Process.send_after(self(), :check_status, 10 * 1000);
{:noreply, {host, port, :disconnected, creds, listeners, inflight}} {:noreply, {host, port, :disconnected, creds, listeners, inflight}}
end end
@ -118,15 +122,15 @@ defmodule BitcoinStream.RPC do
{:ok, status, info} {:ok, status, info}
else else
{:ok, status, _} -> {:ok, status, _} ->
IO.puts("RPC request #{method} failed with HTTP code #{status}") Logger.error("RPC request #{method} failed with HTTP code #{status}")
{:error, status} {:error, status}
{:error, reason} -> {:error, reason} ->
IO.puts("RPC request #{method} failed"); Logger.error("RPC request #{method} failed");
IO.inspect(reason) Logger.error("#{inspect(reason)}");
{:error, reason} {:error, reason}
err -> err ->
IO.puts("RPC request #{method} failed: (unknown reason)"); Logger.error("RPC request #{method} failed: (unknown reason)");
IO.inspect(err); Logger.error("#{inspect(err)}");
{:error, err} {:error, err}
end end
end end
@ -134,8 +138,8 @@ defmodule BitcoinStream.RPC do
{:ok, task.ref} {:ok, task.ref}
else else
err -> err ->
IO.puts("failed to make RPC request"); Logger.error("failed to make RPC request");
IO.inspect(err); Logger.error("#{inspect(err)}");
:error :error
end end
end end
@ -171,16 +175,16 @@ defmodule BitcoinStream.RPC do
{ user, pw } { user, pw }
else else
{:error, reason} -> {:error, reason} ->
IO.puts("Failed to load bitcoin rpc cookie"); Logger.error("Failed to load bitcoin rpc cookie");
IO.inspect(reason) Logger.error("#{inspect(reason)}")
:error :error
err -> err ->
IO.puts("Failed to load bitcoin rpc cookie: (unknown reason)"); Logger.error("Failed to load bitcoin rpc cookie: (unknown reason)");
IO.inspect(err); Logger.error("#{inspect(err)}")
:error :error
end end
true -> true ->
IO.puts("Missing bitcoin rpc credentials"); Logger.error("Missing bitcoin rpc credentials");
:error :error
end end
end end

View File

@ -1,5 +1,7 @@
Application.ensure_all_started(BitcoinStream.RPC) Application.ensure_all_started(BitcoinStream.RPC)
require Logger
defmodule BitcoinStream.BlockData do defmodule BitcoinStream.BlockData do
@moduledoc """ @moduledoc """
Block data module. Block data module.
@ -9,7 +11,7 @@ defmodule BitcoinStream.BlockData do
use GenServer use GenServer
def start_link(opts) do def start_link(opts) do
IO.puts("Starting block data link") Logger.info("Starting block data link");
# load block # load block
with {:ok, json} <- File.read("data/last_block.json"), with {:ok, json} <- File.read("data/last_block.json"),

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Bridge.Block do defmodule BitcoinStream.Bridge.Block do
@moduledoc """ @moduledoc """
Bitcoin event bridge module. Bitcoin event bridge module.
@ -20,7 +22,7 @@ defmodule BitcoinStream.Bridge.Block do
end end
def start_link(host, port) do def start_link(host, port) do
IO.puts("Starting Bitcoin Block bridge on #{host} port #{port}") Logger.info("Starting Bitcoin Block bridge on #{host} port #{port}");
GenServer.start_link(__MODULE__, {host, port}) GenServer.start_link(__MODULE__, {host, port})
end end
@ -38,17 +40,17 @@ defmodule BitcoinStream.Bridge.Block do
defp connect(host, port) do defp connect(host, port) do
# check rpc online & synced # check rpc online & synced
wait_for_ibd(); wait_for_ibd();
IO.puts("Node ready, connecting to block socket"); Logger.info("Node ready, connecting to block socket");
# connect to socket # connect to socket
{:ok, socket} = :chumak.socket(:sub); {:ok, socket} = :chumak.socket(:sub);
IO.puts("Connected block zmq socket on #{host} port #{port}"); Logger.info("Connected block zmq socket on #{host} port #{port}");
:chumak.subscribe(socket, 'rawblock') :chumak.subscribe(socket, 'rawblock')
IO.puts("Subscribed to rawblock events") Logger.debug("Subscribed to rawblock events")
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to block socket pid #{inspect pid}"); {:ok, pid} -> Logger.debug("Binding ok to block socket pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding block socket failed: #{reason}"); {:error, reason} -> Logger.error("Binding block socket failed: #{reason}");
_ -> IO.puts("???"); _ -> Logger.debug("???");
end end
# start block loop # start block loop
@ -60,7 +62,7 @@ defmodule BitcoinStream.Bridge.Block do
:ok -> true :ok -> true
_ -> _ ->
IO.puts("Waiting for node to come online and fully sync before connecting to block socket"); Logger.info("Waiting for node to come online and fully sync before connecting to block socket");
RPC.notify_on_ready(:rpc) RPC.notify_on_ready(:rpc)
end end
end end
@ -70,27 +72,27 @@ defmodule BitcoinStream.Bridge.Block do
{:ok, payload} -> {:ok, payload} ->
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) -> Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
for {pid, _} <- entries do for {pid, _} <- entries do
IO.puts("Forwarding to pid #{inspect pid}") Logger.debug("Forwarding to pid #{inspect pid}")
Process.send(pid, payload, []); Process.send(pid, payload, []);
end end
end) end)
{:error, reason} -> IO.puts("Error json encoding block: #{reason}"); {:error, reason} -> Logger.error("Error json encoding block: #{reason}");
end end
end end
defp loop(socket, seq) do defp loop(socket, seq) do
IO.puts("waiting for block"); Logger.debug("waiting for block");
with {:ok, message} <- :chumak.recv_multipart(socket), # wait for the next zmq message in the queue with {:ok, message} <- :chumak.recv_multipart(socket), # wait for the next zmq message in the queue
[_topic, payload, <<sequence::little-size(32)>>] <- message, [_topic, payload, <<sequence::little-size(32)>>] <- message,
true <- (seq != sequence), # discard contiguous duplicate messages true <- (seq != sequence), # discard contiguous duplicate messages
_ <- IO.puts("block received"), _ <- Logger.info("block received"),
_ <- Mempool.set_block_locked(:mempool, true), _ <- Mempool.set_block_locked(:mempool, true),
{:ok, block} <- BitcoinBlock.decode(payload), {:ok, block} <- BitcoinBlock.decode(payload),
count <- Mempool.clear_block_txs(:mempool, block), count <- Mempool.clear_block_txs(:mempool, block),
_ <- Mempool.set_block_locked(:mempool, false), _ <- Mempool.set_block_locked(:mempool, false),
{:ok, json} <- Jason.encode(block), {:ok, json} <- Jason.encode(block),
:ok <- File.write("data/last_block.json", json) do :ok <- File.write("data/last_block.json", json) do
IO.puts("processed block #{block.id}"); Logger.info("processed block #{block.id}");
BlockData.set_json_block(:block_data, block.id, json); BlockData.set_json_block(:block_data, block.id, json);
send_block(block, count); send_block(block, count);
loop(socket, sequence) loop(socket, sequence)

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Bridge.Sequence do defmodule BitcoinStream.Bridge.Sequence do
@moduledoc """ @moduledoc """
Bitcoin event bridge module. Bitcoin event bridge module.
@ -18,7 +20,7 @@ defmodule BitcoinStream.Bridge.Sequence do
end end
def start_link(host, port) do def start_link(host, port) do
IO.puts("Starting Bitcoin Sequence bridge on #{host} port #{port}") Logger.info("Starting Bitcoin Sequence bridge on #{host} port #{port}")
GenServer.start_link(__MODULE__, {host, port}) GenServer.start_link(__MODULE__, {host, port})
end end
@ -36,17 +38,17 @@ defmodule BitcoinStream.Bridge.Sequence do
defp connect(host, port) do defp connect(host, port) do
# check rpc online & synced # check rpc online & synced
wait_for_ibd(); wait_for_ibd();
IO.puts("Node ready, connecting to sequence socket"); Logger.info("Node ready, connecting to sequence socket");
# connect to socket # connect to socket
{:ok, socket} = :chumak.socket(:sub); {:ok, socket} = :chumak.socket(:sub);
IO.puts("Connected sequence zmq socket on #{host} port #{port}"); Logger.info("Connected sequence zmq socket on #{host} port #{port}");
:chumak.subscribe(socket, 'sequence') :chumak.subscribe(socket, 'sequence')
IO.puts("Subscribed to sequence events") Logger.debug("Subscribed to sequence events")
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to sequence socket pid #{inspect pid}"); {:ok, pid} -> Logger.debug("Binding ok to sequence socket pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding sequence socket failed: #{reason}"); {:error, reason} -> Logger.error("Binding sequence socket failed: #{reason}");
_ -> IO.puts("???"); _ -> Logger.info("???");
end end
# start tx loop # start tx loop
@ -58,13 +60,13 @@ defmodule BitcoinStream.Bridge.Sequence do
:ok -> true :ok -> true
_ -> _ ->
IO.puts("Waiting for node to come online and fully sync before connecting to sequence socket"); Logger.info("Waiting for node to come online and fully sync before connecting to sequence socket");
RPC.notify_on_ready(:rpc) RPC.notify_on_ready(:rpc)
end end
end end
defp send_txn(txn, count) do defp send_txn(txn, count) do
# IO.puts("Forwarding transaction to websocket clients") # Logger.info("Forwarding transaction to websocket clients")
case Jason.encode(%{type: "txn", txn: txn, count: count}) do case Jason.encode(%{type: "txn", txn: txn, count: count}) do
{:ok, payload} -> {:ok, payload} ->
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) -> Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
@ -72,7 +74,7 @@ defmodule BitcoinStream.Bridge.Sequence do
Process.send(pid, payload, []) Process.send(pid, payload, [])
end end
end) end)
{:error, reason} -> IO.puts("Error json encoding transaction: #{reason}"); {:error, reason} -> Logger.error("Error json encoding transaction: #{reason}");
end end
end end
@ -84,7 +86,7 @@ defmodule BitcoinStream.Bridge.Sequence do
Process.send(pid, payload, []); Process.send(pid, payload, []);
end end
end) end)
{:error, reason} -> IO.puts("Error json encoding drop message: #{reason}"); {:error, reason} -> Logger.error("Error json encoding drop message: #{reason}");
end end
end end
@ -93,7 +95,6 @@ defmodule BitcoinStream.Bridge.Sequence do
[_topic, <<hash::binary-size(32), type::binary-size(1), seq::little-size(64)>>, <<_sequence::little-size(32)>>] <- message, [_topic, <<hash::binary-size(32), type::binary-size(1), seq::little-size(64)>>, <<_sequence::little-size(32)>>] <- message,
txid <- Base.encode16(hash, case: :lower), txid <- Base.encode16(hash, case: :lower),
event <- to_charlist(type) do event <- to_charlist(type) do
# IO.puts("loop #{sequence}");
case event do case event do
# Transaction added to mempool # Transaction added to mempool
'A' -> 'A' ->
@ -101,7 +102,6 @@ defmodule BitcoinStream.Bridge.Sequence do
false -> false false -> false
{ txn, count } -> { txn, count } ->
# IO.puts("*SEQ* #{txid}");
send_txn(txn, count) send_txn(txn, count)
end end

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Bridge.Tx do defmodule BitcoinStream.Bridge.Tx do
@moduledoc """ @moduledoc """
Bitcoin event bridge module. Bitcoin event bridge module.
@ -19,7 +21,7 @@ defmodule BitcoinStream.Bridge.Tx do
end end
def start_link(host, port) do def start_link(host, port) do
IO.puts("Starting Bitcoin Tx bridge on #{host} port #{port}") Logger.info("Starting Bitcoin Tx bridge on #{host} port #{port}")
GenServer.start_link(__MODULE__, {host, port}) GenServer.start_link(__MODULE__, {host, port})
end end
@ -37,17 +39,17 @@ defmodule BitcoinStream.Bridge.Tx do
defp connect(host, port) do defp connect(host, port) do
# check rpc online & synced # check rpc online & synced
wait_for_ibd(); wait_for_ibd();
IO.puts("Node ready, connecting to tx socket"); Logger.info("Node ready, connecting to tx socket");
# connect to socket # connect to socket
{:ok, socket} = :chumak.socket(:sub); {:ok, socket} = :chumak.socket(:sub);
IO.puts("Connected tx zmq socket on #{host} port #{port}"); Logger.info("Connected tx zmq socket on #{host} port #{port}");
:chumak.subscribe(socket, 'rawtx') :chumak.subscribe(socket, 'rawtx')
IO.puts("Subscribed to rawtx events") Logger.debug("Subscribed to rawtx events")
case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do case :chumak.connect(socket, :tcp, String.to_charlist(host), port) do
{:ok, pid} -> IO.puts("Binding ok to tx socket pid #{inspect pid}"); {:ok, pid} -> Logger.debug("Binding ok to tx socket pid #{inspect pid}");
{:error, reason} -> IO.puts("Binding tx socket failed: #{reason}"); {:error, reason} -> Logger.error("Binding tx socket failed: #{reason}");
_ -> IO.puts("???"); _ -> Logger.info("???");
end end
# start tx loop # start tx loop
@ -59,13 +61,12 @@ defmodule BitcoinStream.Bridge.Tx do
:ok -> true :ok -> true
_ -> _ ->
IO.puts("Waiting for node to come online and fully sync before connecting to tx socket"); Logger.info("Waiting for node to come online and fully sync before connecting to tx socket");
RPC.notify_on_ready(:rpc) RPC.notify_on_ready(:rpc)
end end
end end
defp send_txn(txn, count) do defp send_txn(txn, count) do
# IO.puts("Forwarding transaction to websocket clients")
case Jason.encode(%{type: "txn", txn: txn, count: count}) do case Jason.encode(%{type: "txn", txn: txn, count: count}) do
{:ok, payload} -> {:ok, payload} ->
Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) -> Registry.dispatch(Registry.BitcoinStream, "txs", fn(entries) ->
@ -73,7 +74,7 @@ defmodule BitcoinStream.Bridge.Tx do
Process.send(pid, payload, []) Process.send(pid, payload, [])
end end
end) end)
{:error, reason} -> IO.puts("Error json encoding transaction: #{reason}"); {:error, reason} -> Logger.error("Error json encoding transaction: #{reason}");
end end
end end
@ -97,12 +98,12 @@ defmodule BitcoinStream.Bridge.Tx do
end end
{:err, reason} -> {:err, reason} ->
IO.puts("Error decoding tx: #{reason}"); Logger.error("Error decoding tx: #{reason}");
false false
error -> error ->
IO.puts("Error decoding tx"); Logger.error("Error decoding tx");
IO.inspect(error); Logger.error("#{inspect(error)}");
false false
end end
end end

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Mempool do defmodule BitcoinStream.Mempool do
@moduledoc """ @moduledoc """
GenServer for retrieving and maintaining mempool info GenServer for retrieving and maintaining mempool info
@ -22,7 +24,7 @@ defmodule BitcoinStream.Mempool do
connecting to a bitcoin node at RPC `host:port` for ground truth data connecting to a bitcoin node at RPC `host:port` for ground truth data
""" """
def start_link(opts) do def start_link(opts) do
IO.puts("Starting Mempool Tracker"); Logger.info("Starting Mempool Tracker");
# cache of all transactions in the node mempool, mapped to {inputs, total_input_value} # cache of all transactions in the node mempool, mapped to {inputs, total_input_value}
:ets.new(:mempool_cache, [:set, :public, :named_table]); :ets.new(:mempool_cache, [:set, :public, :named_table]);
# cache of transactions ids in the mempool, but not yet synchronized with the :mempool_cache # cache of transactions ids in the mempool, but not yet synchronized with the :mempool_cache
@ -311,7 +313,7 @@ defmodule BitcoinStream.Mempool do
Process.send(pid, payload, []); Process.send(pid, payload, []);
end end
end) end)
{:error, reason} -> IO.puts("Error json encoding count: #{reason}"); {:error, reason} -> Logger.error("Error json encoding count: #{reason}");
end end
end end
@ -325,7 +327,7 @@ defmodule BitcoinStream.Mempool do
end end
def sync(pid) do def sync(pid) do
IO.puts("Preparing mempool sync"); Logger.info("Preparing mempool sync");
with {:ok, 200, %{"mempool_sequence" => sequence, "txids" => txns}} <- RPC.request(:rpc, "getrawmempool", [false, true]) do with {:ok, 200, %{"mempool_sequence" => sequence, "txids" => txns}} <- RPC.request(:rpc, "getrawmempool", [false, true]) do
set_seq(pid, sequence); set_seq(pid, sequence);
count = length(txns); count = length(txns);
@ -337,14 +339,14 @@ defmodule BitcoinStream.Mempool do
sync_queue(pid, queue); sync_queue(pid, queue);
set_queue(pid, []); set_queue(pid, []);
IO.puts("Loaded #{count} mempool transactions"); Logger.info("Loaded #{count} mempool transactions");
send_mempool_count(pid); send_mempool_count(pid);
do_sync(pid, txns); do_sync(pid, txns);
:ok :ok
else else
err -> err ->
IO.puts("Pool sync failed"); Logger.error("Pool sync failed");
IO.inspect(err); Logger.error("#{inspect(err)}");
#retry after 30 seconds #retry after 30 seconds
:timer.sleep(10000); :timer.sleep(10000);
sync(pid) sync(pid)
@ -352,9 +354,9 @@ defmodule BitcoinStream.Mempool do
end end
def do_sync(pid, txns) do def do_sync(pid, txns) do
IO.puts("Syncing #{length(txns)} mempool transactions"); Logger.info("Syncing #{length(txns)} mempool transactions");
sync_mempool(pid, txns); sync_mempool(pid, txns);
IO.puts("MEMPOOL SYNC FINISHED"); Logger.info("MEMPOOL SYNC FINISHED");
set_done(pid); set_done(pid);
:ok :ok
end end
@ -374,11 +376,11 @@ defmodule BitcoinStream.Mempool do
if inflated_txn.inflated do if inflated_txn.inflated do
insert(pid, txid, inflated_txn) insert(pid, txid, inflated_txn)
else else
IO.puts("failed to inflate loaded mempool txn #{txid}") Logger.debug("failed to inflate loaded mempool txn #{txid}")
end end
else else
_ -> IO.puts("sync_mempool_txn failed #{txid}") _ -> Logger.debug("sync_mempool_txn failed #{txid}")
end end
[_] -> true [_] -> true
@ -390,7 +392,7 @@ defmodule BitcoinStream.Mempool do
end end
defp sync_mempool_txns(pid, [head | tail], count) do defp sync_mempool_txns(pid, [head | tail], count) do
IO.puts("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}"); Logger.debug("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}");
sync_mempool_txn(pid, head); sync_mempool_txn(pid, head);
sync_mempool_txns(pid, tail, count + 1) sync_mempool_txns(pid, tail, count + 1)
end end

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Mempool.Sync do defmodule BitcoinStream.Mempool.Sync do
use GenServer use GenServer
@ -14,7 +16,7 @@ defmodule BitcoinStream.Mempool.Sync do
end end
def start_link(args) do def start_link(args) do
IO.puts("Starting Mempool Synchronizer") Logger.info("Starting Mempool Synchronizer")
GenServer.start_link(__MODULE__, args) GenServer.start_link(__MODULE__, args)
end end
@ -48,14 +50,14 @@ defmodule BitcoinStream.Mempool.Sync do
:ok -> true :ok -> true
_ -> _ ->
IO.puts("Waiting for node to come online and fully sync before synchronizing mempool"); Logger.info("Waiting for node to come online and fully sync before synchronizing mempool");
RPC.notify_on_ready(:rpc) RPC.notify_on_ready(:rpc)
end end
end end
defp first_sync() do defp first_sync() do
wait_for_ibd(); wait_for_ibd();
IO.puts("Preparing mempool sync"); Logger.info("Preparing mempool sync");
Mempool.sync(:mempool); Mempool.sync(:mempool);
Process.send_after(self(), :resync, 20 * 1000); Process.send_after(self(), :resync, 20 * 1000);
end end
@ -64,28 +66,28 @@ defmodule BitcoinStream.Mempool.Sync do
wait_for_ibd(); wait_for_ibd();
case Mempool.is_block_locked(:mempool) do case Mempool.is_block_locked(:mempool) do
true -> true ->
IO.puts("Processing block, delay mempool health check by 5 seconds"); Logger.debug("Processing block, delay mempool health check by 5 seconds");
Process.send_after(self(), :resync, 5 * 1000) Process.send_after(self(), :resync, 5 * 1000)
false -> false ->
with {:ok, 200, %{"size" => size}} when is_integer(size) <- RPC.request(:rpc, "getmempoolinfo", []), with {:ok, 200, %{"size" => size}} when is_integer(size) <- RPC.request(:rpc, "getmempoolinfo", []),
count when is_integer(count) <- Mempool.get(:mempool) do count when is_integer(count) <- Mempool.get(:mempool) do
IO.puts("Mempool health check - Core count: #{size} | Bitfeed count: #{count}"); Logger.debug("Mempool health check - Core count: #{size} | Bitfeed count: #{count}");
# if we've diverged from the true count by more than 50 txs, then fix # if we've diverged from the true count by more than 50 txs, then fix
# ensures our count doesn't stray too far due to missed events & unexpected errors. # ensures our count doesn't stray too far due to missed events & unexpected errors.
if (abs(size - count) > 50) do if (abs(size - count) > 50) do
IO.puts("resync"); Logger.debug("resync");
Mempool.set(:mempool, size); Mempool.set(:mempool, size);
newcount = Mempool.get(:mempool); newcount = Mempool.get(:mempool);
IO.puts("updated to #{newcount}"); Logger.debug("updated to #{newcount}");
end end
# next check in 1 minute # next check in 1 minute
Process.send_after(self(), :resync, 60 * 1000) Process.send_after(self(), :resync, 60 * 1000)
else else
err -> err ->
IO.puts("mempool health check failed"); Logger.error("mempool health check failed");
IO.inspect(err); Logger.error("#{inspect(err)}");
#retry in 10 seconds #retry in 10 seconds
Process.send_after(self(), :resync, 10 * 1000) Process.send_after(self(), :resync, 10 * 1000)
end end

View File

@ -1,5 +1,7 @@
Application.ensure_all_started(BitcoinStream.RPC) Application.ensure_all_started(BitcoinStream.RPC)
require Logger
defmodule BitcoinStream.Protocol.Block do defmodule BitcoinStream.Protocol.Block do
@moduledoc """ @moduledoc """
Summarised bitcoin block. Summarised bitcoin block.
@ -49,10 +51,10 @@ def decode(block_binary) do
}} }}
else else
{:error, reason} -> {:error, reason} ->
IO.puts("Error decoding data for BitcoinBlock: #{reason}") Logger.error("Error decoding data for BitcoinBlock: #{reason}")
:error :error
_ -> _ ->
IO.puts("Error decoding data for BitcoinBlock: (unknown reason)") Logger.error("Error decoding data for BitcoinBlock: (unknown reason)")
:error :error
end end
end end
@ -81,7 +83,7 @@ defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do
if do_inflate do if do_inflate do
inflated_txn = BitcoinTx.inflate(extended_txn) inflated_txn = BitcoinTx.inflate(extended_txn)
if (inflated_txn.inflated) do if (inflated_txn.inflated) do
IO.puts("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{next.id}"); Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{next.id}");
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true) summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true)
else else
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false) summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false)

View File

@ -1,5 +1,7 @@
Application.ensure_all_started(BitcoinStream.RPC) Application.ensure_all_started(BitcoinStream.RPC)
require Logger
defmodule BitcoinStream.Protocol.Transaction do defmodule BitcoinStream.Protocol.Transaction do
@moduledoc """ @moduledoc """
Extended bitcoin transaction struct Extended bitcoin transaction struct
@ -142,15 +144,15 @@ defmodule BitcoinStream.Protocol.Transaction do
} } } }
else else
{:ok, 500, reason} -> {:ok, 500, reason} ->
IO.puts("transaction not found #{input.prev_txid}"); Logger.error("transaction not found #{input.prev_txid}");
IO.inspect(reason) Logger.error("#{inspect(reason)}")
{:error, reason} -> {:error, reason} ->
IO.puts("Failed to inflate input:"); Logger.error("Failed to inflate input:");
IO.inspect(reason) Logger.error("#{inspect(reason)}")
:error :error
err -> err ->
IO.puts("Failed to inflate input: (unknown reason)"); Logger.error("Failed to inflate input: (unknown reason)");
IO.inspect(err); Logger.error("#{inspect(err)}")
:error :error
end end
end end
@ -185,7 +187,7 @@ defmodule BitcoinStream.Protocol.Transaction do
{:ok, inputs, total} {:ok, inputs, total}
other -> other ->
IO.inspect(other); Logger.error("#{inspect(other)}");
inflate_inputs(inputs, [], 0) inflate_inputs(inputs, [], 0)
end end
end end

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Router do defmodule BitcoinStream.Router do
use Plug.Router use Plug.Router
@ -20,7 +22,7 @@ defmodule BitcoinStream.Router do
put_resp_header(conn, "cache-control", "public, max-age=604800, immutable") put_resp_header(conn, "cache-control", "public, max-age=604800, immutable")
|> send_resp(200, block) |> send_resp(200, block)
_ -> _ ->
IO.puts("Error getting block hash"); Logger.debug("Error getting block hash");
send_resp(conn, 404, "Block not available") send_resp(conn, 404, "Block not available")
end end
end end

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.Server do defmodule BitcoinStream.Server do
use Application use Application
@ -9,8 +11,20 @@ defmodule BitcoinStream.Server do
{ rpc_port, "" } = Integer.parse(System.get_env("BITCOIN_RPC_PORT")); { rpc_port, "" } = Integer.parse(System.get_env("BITCOIN_RPC_PORT"));
{ rpc_pools, "" } = Integer.parse(System.get_env("RPC_POOLS")); { rpc_pools, "" } = Integer.parse(System.get_env("RPC_POOLS"));
{ rpc_pool_size, "" } = Integer.parse(System.get_env("RPC_POOL_SIZE")); { rpc_pool_size, "" } = Integer.parse(System.get_env("RPC_POOL_SIZE"));
log_level = System.get_env("LOG_LEVEL");
btc_host = System.get_env("BITCOIN_HOST"); btc_host = System.get_env("BITCOIN_HOST");
case log_level do
"debug" ->
Logger.configure(level: :debug);
"error" ->
Logger.configure(level: :error);
_ ->
Logger.configure(level: :info);
end
children = [ children = [
Registry.child_spec( Registry.child_spec(
keys: :duplicate, keys: :duplicate,

View File

@ -1,3 +1,5 @@
require Logger
defmodule BitcoinStream.SocketHandler do defmodule BitcoinStream.SocketHandler do
@behaviour :cowboy_websocket @behaviour :cowboy_websocket
@ -16,9 +18,9 @@ defmodule BitcoinStream.SocketHandler do
end end
def get_block(last_seen) do def get_block(last_seen) do
IO.puts("getting block with id #{last_seen}") Logger.debug("getting block with id #{last_seen}")
last_id = BlockData.get_block_id(:block_data) last_id = BlockData.get_block_id(:block_data)
IO.puts("last block id: #{last_id}") Logger.debug("last block id: #{last_id}")
cond do cond do
(last_seen == nil) -> (last_seen == nil) ->
payload = BlockData.get_json_block(:block_data); payload = BlockData.get_json_block(:block_data);
@ -42,12 +44,11 @@ defmodule BitcoinStream.SocketHandler do
end end
def websocket_handle({:text, msg}, state) do def websocket_handle({:text, msg}, state) do
# IO.puts("message received: #{msg} | #{inspect self()}");
case msg do case msg do
"hb" -> {:reply, {:text, msg}, state}; "hb" -> {:reply, {:text, msg}, state};
"block" -> "block" ->
IO.puts('block request'); Logger.debug('block request');
{:reply, {:text, "null"}, state} {:reply, {:text, "null"}, state}
"count" -> "count" ->
@ -72,11 +73,11 @@ defmodule BitcoinStream.SocketHandler do
end end
else else
{:error, reason} -> {:error, reason} ->
IO.puts("Failed to parse websocket message"); Logger.error("Failed to parse websocket message");
IO.inspect(reason) Logger.error("#{inspect(reason)}")
reason -> reason ->
IO.puts("other response"); Logger.error("other response");
IO.inspect(reason) Logger.error("#{inspect(reason)}")
_ -> {:reply, {:text, "?"}, state} _ -> {:reply, {:text, "?"}, state}
end end
end end