From 0e6c8159af3bfb1f42e0c72dba1226b8889ff39c Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 4 Mar 2022 16:42:02 -0600 Subject: [PATCH] Handle transaction floods --- client/nginx/bitfeed.conf.template | 4 + server/config/config.exs | 4 + server/lib/bitcoin_rpc.ex | 172 +++++++++++++++++++++++------ server/lib/bridge_block.ex | 19 +++- server/lib/bridge_tx.ex | 5 +- server/lib/mempool.ex | 75 +++++++++++-- server/lib/mempool_sync.ex | 6 +- server/lib/protocol/block.ex | 2 +- server/lib/protocol/transaction.ex | 56 +++++----- 9 files changed, 265 insertions(+), 78 deletions(-) create mode 100644 server/config/config.exs diff --git a/client/nginx/bitfeed.conf.template b/client/nginx/bitfeed.conf.template index 44fbb1b..1668a2f 100644 --- a/client/nginx/bitfeed.conf.template +++ b/client/nginx/bitfeed.conf.template @@ -12,6 +12,10 @@ server { server_name client; + location = / { + add_header Cache-Control 'no-cache'; + } + location / { try_files $uri $uri/ =404; expires $expires; diff --git a/server/config/config.exs b/server/config/config.exs new file mode 100644 index 0000000..021ab6e --- /dev/null +++ b/server/config/config.exs @@ -0,0 +1,4 @@ +import Config + +config :logger, :console, + format: "$time $metadata[$level] $levelpad$message\n" diff --git a/server/lib/bitcoin_rpc.ex b/server/lib/bitcoin_rpc.ex index 03adf11..ed164d1 100644 --- a/server/lib/bitcoin_rpc.ex +++ b/server/lib/bitcoin_rpc.ex @@ -12,16 +12,16 @@ defmodule BitcoinStream.RPC do {port, opts} = Keyword.pop(opts, :port); {host, opts} = Keyword.pop(opts, :host); Logger.info("Starting Bitcoin RPC server on #{host} port #{port}") - GenServer.start_link(__MODULE__, {host, port, nil, nil, [], %{}}, opts) + GenServer.start_link(__MODULE__, {host, port, nil, nil, [], %{}, nil}, opts) end @impl true - def init({host, port, status, _, listeners, inflight}) do + def init({host, port, status, _, listeners, inflight, last_failure}) do # start node monitoring loop creds = rpc_creds(); send(self(), :check_status); - {:ok, {host, port, status, creds, listeners, inflight}} + {:ok, {host, port, status, creds, listeners, inflight, last_failure}} end defp notify_listeners([]) do @@ -32,30 +32,40 @@ defmodule BitcoinStream.RPC do notify_listeners(tail) end + # seconds until cool off period ends + defp remaining_cool_off(now, time) do + 10 - Time.diff(now, time, :second) + end + + defp is_cooling_off(time) do + now = Time.utc_now; + (remaining_cool_off(now, time) > 0) + end + @impl true - def handle_info(:check_status, {host, port, status, creds, listeners, inflight}) do + def handle_info(:check_status, {host, port, status, creds, listeners, inflight, last_failure}) do case single_request("getblockchaininfo", [], host, port, creds) do {:ok, task_ref} -> - {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, :status)}} + {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, :status), last_failure}} :error -> Logger.info("Waiting to connect to Bitcoin Core"); Process.send_after(self(), :check_status, 10 * 1000); - {:noreply, {host, port, status, creds, listeners, inflight}} + {:noreply, {host, port, status, creds, listeners, inflight, last_failure}} end end @impl true - def handle_info({:DOWN, ref, :process, _pid, _reason}, {host, port, status, creds, listeners, inflight}) do + def handle_info({:DOWN, ref, :process, _pid, _reason}, {host, port, status, creds, listeners, inflight, last_failure}) do {_, inflight} = Map.pop(inflight, ref); - {:noreply, {host, port, status, creds, listeners, inflight}} + {:noreply, {host, port, status, creds, listeners, inflight, last_failure}} end @impl true - def handle_info({ref, result}, {host, port, status, creds, listeners, inflight}) do + def handle_info({ref, result}, {host, port, status, creds, listeners, inflight, last_failure}) do case Map.pop(inflight, ref) do {nil, inflight} -> - {:noreply, {host, port, status, creds, listeners, inflight}} + {:noreply, {host, port, status, creds, listeners, inflight, last_failure}} {:status, inflight} -> case result do @@ -65,55 +75,88 @@ defmodule BitcoinStream.RPC do Logger.info("Bitcoin Core connected and synced"); notify_listeners(listeners); Process.send_after(self(), :check_status, 300 * 1000); - {:noreply, {host, port, :ok, creds, [], inflight}} + {:noreply, {host, port, :ok, creds, [], inflight, last_failure}} {:ok, 200, %{"initialblockdownload" => true}} -> Logger.info("Bitcoin Core connected, waiting for initial block download"); Process.send_after(self(), :check_status, 30 * 1000); - {:noreply, {host, port, :ibd, creds, listeners, inflight}} + {:noreply, {host, port, :ibd, creds, listeners, inflight, last_failure}} _ -> Logger.info("Waiting to connect to Bitcoin Core"); Process.send_after(self(), :check_status, 10 * 1000); - {:noreply, {host, port, :disconnected, creds, listeners, inflight}} + {:noreply, {host, port, :disconnected, creds, listeners, inflight, last_failure}} end {from, inflight} -> GenServer.reply(from, result) - {:noreply, {host, port, status, creds, listeners, inflight}} + {:noreply, {host, port, status, creds, listeners, inflight, last_failure}} end end @impl true - def handle_call({:request, method, params}, from, {host, port, status, creds, listeners, inflight}) do + def handle_call(:on_rpc_failure, _from, {host, port, status, creds, listeners, inflight, last_failure}) do + if (last_failure != nil and is_cooling_off(last_failure)) do + # don't reset if cooling period is already active + {:reply, :ok, {host, port, status, creds, listeners, inflight, last_failure}} + else + Logger.info("RPC failure, cooling off non-essential requests for 10 seconds"); + {:reply, :ok, {host, port, status, creds, listeners, inflight, Time.utc_now}} + end + end + + @impl true + def handle_call(:on_rpc_success, _from, {host, port, status, creds, listeners, inflight, last_failure}) do + if (last_failure != nil) do + if (is_cooling_off(last_failure)) do + # don't clear an active cooling period + {:reply, :ok, {host, port, status, creds, listeners, inflight, last_failure}} + else + Logger.info("RPC failure resolved, ending cool off period"); + {:reply, :ok, {host, port, status, creds, listeners, inflight, nil}} + end + else + # cool off already cleared + {:reply, :ok, {host, port, status, creds, listeners, inflight, nil}} + end + end + + @impl true + def handle_call({:request, method, params}, from, {host, port, status, creds, listeners, inflight, last_failure}) do case single_request(method, params, host, port, creds) do {:ok, task_ref} -> - {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, from)}} + {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, from), last_failure}} :error -> - {:reply, 500, {host, port, status, creds, listeners, inflight}} + {:reply, 500, {host, port, status, creds, listeners, inflight, last_failure}} end end @impl true - def handle_call({:batch_request, method, batch_params}, from, {host, port, status, creds, listeners, inflight}) do - case batch_request(method, batch_params, host, port, creds) do - {:ok, task_ref} -> - {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, from)}} + def handle_call({:batch_request, method, batch_params, fail_fast}, from, {host, port, status, creds, listeners, inflight, last_failure}) do + # enforce the 10 second cool-off period + if (fail_fast and last_failure != nil and is_cooling_off(last_failure)) do + # Logger.debug("skipping non-essential RPC request during cool-off period: #{remaining_cool_off(Time.utc_now, last_failure)} seconds remaining"); + {:reply, {:error, :cool_off}, {host, port, status, creds, listeners, inflight, last_failure}} + else + case do_batch_request(method, batch_params, host, port, creds) do + {:ok, task_ref} -> + {:noreply, {host, port, status, creds, listeners, Map.put(inflight, task_ref, from), last_failure}} - :error -> - {:reply, 500, {host, port, status, creds, listeners, inflight}} + :error -> + {:reply, 500, {host, port, status, creds, listeners, inflight, last_failure}} + end end end @impl true - def handle_call(:get_node_status, _from, {host, port, status, creds, listeners, inflight}) do - {:reply, status, {host, port, status, creds, listeners, inflight}} + def handle_call(:get_node_status, _from, {host, port, status, creds, listeners, inflight, last_failure}) do + {:reply, status, {host, port, status, creds, listeners, inflight, last_failure}} end @impl true - def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners, inflight}) do - {:noreply, {host, port, status, creds, [from | listeners], inflight}} + def handle_call(:notify_on_ready, from, {host, port, status, creds, listeners, inflight, last_failure}) do + {:noreply, {host, port, status, creds, [from | listeners], inflight, last_failure}} end def notify_on_ready(pid) do @@ -131,7 +174,7 @@ defmodule BitcoinStream.RPC do end end - defp batch_request(method, batch_params, host, port, creds) do + defp do_batch_request(method, batch_params, host, port, creds) do case Jason.encode(Enum.map(batch_params, fn [params, id] -> %{method: method, params: [params], id: id} end)) do {:ok, body} -> async_request(body, host, port, creds) @@ -142,12 +185,33 @@ defmodule BitcoinStream.RPC do end end + defp submit_rpc(body, host, port, user, pw) do + result = Finch.build(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}, {"authorization", BasicAuth.encode_basic_auth(user, pw)}], body) |> Finch.request(FinchClient, [pool_timeout: 30000, receive_timeout: 30000]); + case result do + {:ok, %Finch.Response{body: response_body, headers: _headers, status: status}} -> + { :ok, status, response_body } + + error -> + Logger.debug("bad rpc response: #{inspect(error)}"); + { :error, error } + end + catch + :exit, {:timeout, _} -> + :timeout + + :exit, reason -> + {:error, reason} + + error -> + {:error, error} + end + defp async_request(body, host, port, creds) do with { user, pw } <- creds do task = Task.async( fn -> - with {:ok, %Finch.Response{body: body, headers: _headers, status: status}} <- Finch.build(:post, "http://#{host}:#{port}", [{"content-type", "application/json"}, {"authorization", BasicAuth.encode_basic_auth(user, pw)}], body) |> Finch.request(FinchClient), - {:ok, response} <- Jason.decode(body) do + with {:ok, status, response_body} <- submit_rpc(body, host, port, user, pw), + {:ok, response} <- Jason.decode(response_body) do case response do %{"result" => info} -> {:ok, status, info} @@ -155,6 +219,10 @@ defmodule BitcoinStream.RPC do _ -> {:ok, status, response} end else + :timeout -> + Logger.debug("rpc timeout"); + {:error, :timeout} + {:ok, status, _} -> Logger.error("RPC request failed with HTTP code #{status}") {:error, status} @@ -179,7 +247,7 @@ defmodule BitcoinStream.RPC do end def request(pid, method, params) do - GenServer.call(pid, {:request, method, params}, 30000) + GenServer.call(pid, {:request, method, params}, 60000) catch :exit, reason -> case reason do @@ -191,17 +259,51 @@ defmodule BitcoinStream.RPC do error -> {:error, error} end - def batch_request(pid, method, batch_params) do - GenServer.call(pid, {:batch_request, method, batch_params}, 30000) + # if fail_fast == true, an RPC failure triggers a cooling off period + # where subsequent fail_fast=true requests immediately fail + # RPC failures usually caused by resource saturation (exhausted local or remote RPC pool) + # so this prevents RPC floods from causing cascading failures + # calls with fail_fast=false are unaffected by the fail_fast cool-off period + def batch_request(pid, method, batch_params, fail_fast \\ false) do + case GenServer.call(pid, {:batch_request, method, batch_params, fail_fast}, 30000) do + {:ok, status, result} -> + if (fail_fast) do + GenServer.call(pid, :on_rpc_success); + end + {:ok, status, result} + + {:error, :cool_off} -> + {:error, :cool_off} + + {:error, error} -> + if (fail_fast) do + GenServer.call(pid, :on_rpc_failure); + end + {:error, error} + + catchall -> + if (fail_fast) do + GenServer.call(pid, :on_rpc_failure); + end + catchall + end catch :exit, reason -> + if (fail_fast) do + GenServer.call(pid, :on_rpc_failure); + end case reason do - {:timeout, _} -> {:error, :timeout} + {:timeout, _} -> + {:error, :timeout} _ -> {:error, reason} end - error -> {:error, error} + error -> + if (fail_fast) do + GenServer.call(pid, :on_rpc_failure); + end + {:error, error} end def get_node_status(pid) do diff --git a/server/lib/bridge_block.ex b/server/lib/bridge_block.ex index 59b2977..7425214 100644 --- a/server/lib/bridge_block.ex +++ b/server/lib/bridge_block.ex @@ -54,7 +54,7 @@ defmodule BitcoinStream.Bridge.Block do end # start block loop - loop(socket, 0) + loop(socket, nil) end defp wait_for_ibd() do @@ -83,21 +83,32 @@ defmodule BitcoinStream.Bridge.Block do defp loop(socket, seq) do Logger.debug("waiting for block"); with {:ok, message} <- :chumak.recv_multipart(socket), # wait for the next zmq message in the queue + _ <- Logger.debug("block msg received"), [_topic, payload, <>] <- message, + _ <- Logger.debug("block msg decoded #{seq}"), true <- (seq != sequence), # discard contiguous duplicate messages - _ <- Logger.info("block received"), + _ <- Logger.info("new block received"), _ <- Mempool.set_block_locked(:mempool, true), + _ <- Logger.debug("locked mempool resync"), {:ok, block} <- BitcoinBlock.decode(payload), + _ <- Logger.debug("decoded block msg"), count <- Mempool.clear_block_txs(:mempool, block), + _ <- Logger.debug("#{count} txs remain in mempool"), _ <- Mempool.set_block_locked(:mempool, false), + _ <- Logger.debug("unlocked mempool resync"), {:ok, json} <- Jason.encode(block), - :ok <- File.write("data/last_block.json", json) do + _ <- Logger.debug("json encoded block data"), + :ok <- File.write("data/last_block.json", json), + _ <- Logger.debug("wrote block to file") do Logger.info("processed block #{block.id}"); BlockData.set_json_block(:block_data, block.id, json); + Logger.debug("cached block data"); send_block(block, count); loop(socket, sequence) else - _ -> loop(socket, seq) + _ -> + Logger.debug("block exception"); + loop(socket, seq) end end diff --git a/server/lib/bridge_tx.ex b/server/lib/bridge_tx.ex index 69aa1d4..3843e8b 100644 --- a/server/lib/bridge_tx.ex +++ b/server/lib/bridge_tx.ex @@ -84,11 +84,12 @@ defmodule BitcoinStream.Bridge.Tx do case Mempool.get_tx_status(:mempool, txn.id) do # :registered and :new transactions are inflated and inserted into the mempool status when (status in [:registered, :new]) -> - inflated_txn = BitcoinTx.inflate(txn); + inflated_txn = BitcoinTx.inflate(txn, true); case Mempool.insert(:mempool, txn.id, inflated_txn) do # Mempool.insert returns the size of the mempool if insertion was successful # Forward tx to clients in this case - count when is_integer(count) -> send_txn(inflated_txn, count) + count when is_integer(count) -> + send_txn(inflated_txn, count) _ -> false end diff --git a/server/lib/mempool.ex b/server/lib/mempool.ex index 75f2e4a..e821504 100644 --- a/server/lib/mempool.ex +++ b/server/lib/mempool.ex @@ -132,7 +132,7 @@ defmodule BitcoinStream.Mempool do end defp get_queue(pid) do - GenServer.call(pid, :get_queue) + GenServer.call(pid, :get_queue, 60000) end defp set_queue(pid, queue) do @@ -190,7 +190,7 @@ defmodule BitcoinStream.Mempool do # new transaction, id already registered :registered -> with [] <- :ets.lookup(:block_cache, txid) do # double check tx isn't included in the last block - :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee }, nil}); + :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); get(pid) else _ -> @@ -246,7 +246,7 @@ defmodule BitcoinStream.Mempool do # data already received, but tx not registered [{_txid, _, txn}] when txn != nil -> - :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee }, nil}); + :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); :ets.delete(:sync_cache, txid); if do_count do increment(pid); @@ -371,14 +371,9 @@ defmodule BitcoinStream.Mempool do with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), rawtx <- Base.decode16!(hextx, case: :lower), {:ok, txn } <- BitcoinTx.decode(rawtx), - inflated_txn <- BitcoinTx.inflate(txn) do + inflated_txn <- BitcoinTx.inflate(txn, false) do register(pid, txid, nil, false); - if inflated_txn.inflated do - insert(pid, txid, inflated_txn) - else - Logger.debug("failed to inflate loaded mempool txn #{txid}") - end - + insert(pid, txid, inflated_txn) else _ -> Logger.debug("sync_mempool_txn failed #{txid}") end @@ -397,6 +392,66 @@ defmodule BitcoinStream.Mempool do sync_mempool_txns(pid, tail, count + 1) end + # when transaction inflation fails, we fall back to storing deflated inputs in the cache + # the repair function scans the mempool cache for deflated inputs, and attempts to reinflate + def repair(_pid) do + Logger.debug("Checking mempool integrity"); + repaired = :ets.foldl(&(repair_mempool_txn/2), 0, :mempool_cache); + if repaired > 0 do + Logger.info("MEMPOOL CHECK COMPLETE #{repaired} REPAIRED"); + else + Logger.debug("MEMPOOL REPAIR NOT REQUIRED"); + end + :ok + catch + err -> + Logger.error("Failed to repair mempool: #{inspect(err)}"); + :error + end + + defp repair_mempool_txn(entry, repaired) do + case entry do + # unprocessed + {_, nil, _} -> + repaired + + # valid entry, already inflated + {_txid, {_inputs, _total, true}, _} -> + repaired + + # valid entry, not inflated + # repair + {txid, {_inputs, _total, false}, status} -> + Logger.debug("repairing #{txid}"); + with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), + rawtx <- Base.decode16!(hextx, case: :lower), + {:ok, txn } <- BitcoinTx.decode(rawtx), + inflated_txn <- BitcoinTx.inflate(txn, false) do + if inflated_txn.inflated do + :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, true }, status}); + Logger.debug("repaired #{repaired} mempool txns #{txid}"); + repaired + 1 + else + Logger.debug("failed to inflate transaction for repair #{txid}"); + repaired + end + + else + _ -> Logger.debug("failed to fetch transaction for repair #{txid}"); + repaired + end + + # catch all + other -> + Logger.error("unexpected cache entry: #{inspect(other)}"); + repaired + end + catch + err -> + Logger.debug("unexpected error repairing transaction"); + repaired + end + defp cache_sync_ids(pid, txns) do :ets.delete_all_objects(:sync_cache); cache_sync_ids(pid, txns, 0) diff --git a/server/lib/mempool_sync.ex b/server/lib/mempool_sync.ex index f3f5a77..b8c7a12 100644 --- a/server/lib/mempool_sync.ex +++ b/server/lib/mempool_sync.ex @@ -59,7 +59,7 @@ defmodule BitcoinStream.Mempool.Sync do wait_for_ibd(); Logger.info("Preparing mempool sync"); Mempool.sync(:mempool); - Process.send_after(self(), :resync, 20 * 1000); + Process.send_after(self(), :resync, 1000); end defp loop() do @@ -82,6 +82,10 @@ defmodule BitcoinStream.Mempool.Sync do newcount = Mempool.get(:mempool); Logger.debug("updated to #{newcount}"); end + + # repair transactions with deflated inputs + Mempool.repair(:mempool); + # next check in 1 minute Process.send_after(self(), :resync, 60 * 1000) else diff --git a/server/lib/protocol/block.ex b/server/lib/protocol/block.ex index 5a4ee61..686a93c 100644 --- a/server/lib/protocol/block.ex +++ b/server/lib/protocol/block.ex @@ -85,7 +85,7 @@ defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do # if the mempool is still syncing, inflating txs will take too long, so skip it if do_inflate do - inflated_txn = BitcoinTx.inflate(extended_txn) + inflated_txn = BitcoinTx.inflate(extended_txn, false) if (inflated_txn.inflated) do Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{extended_txn.id}"); summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true) diff --git a/server/lib/protocol/transaction.ex b/server/lib/protocol/transaction.ex index c992ca7..5328afb 100644 --- a/server/lib/protocol/transaction.ex +++ b/server/lib/protocol/transaction.ex @@ -46,6 +46,7 @@ defmodule BitcoinStream.Protocol.Transaction do inputs: raw_tx.inputs, outputs: raw_tx.outputs, value: total_value, + fee: 0, # witnesses: raw_tx.witnesses, lock_time: raw_tx.lock_time, id: id, @@ -69,6 +70,7 @@ defmodule BitcoinStream.Protocol.Transaction do inputs: txn.inputs, outputs: txn.outputs, value: total_value, + fee: 0, # witnesses: txn.witnesses, lock_time: txn.lock_time, id: id, @@ -76,8 +78,8 @@ defmodule BitcoinStream.Protocol.Transaction do } end - def inflate(txn) do - case inflate_inputs(txn.id, txn.inputs) do + def inflate(txn, fail_fast) do + case inflate_inputs(txn.id, txn.inputs, fail_fast) do {:ok, inputs, in_value} -> %__MODULE__{ version: txn.version, @@ -94,7 +96,6 @@ defmodule BitcoinStream.Protocol.Transaction do } {:failed, inputs, _in_value} -> - Logger.error("failed to inflate #{txn.id}"); %__MODULE__{ version: txn.version, inflated: false, @@ -108,6 +109,10 @@ defmodule BitcoinStream.Protocol.Transaction do id: txn.id, time: txn.time } + + catchall -> + Logger.error("unexpected inflate result: #{inspect(catchall)}"); + :ok end end @@ -119,10 +124,10 @@ defmodule BitcoinStream.Protocol.Transaction do count_value(rest, total + next_output.value) end - defp inflate_batch(batch) do + defp inflate_batch(batch, fail_fast) do with batch_params <- Enum.map(batch, fn input -> [input.prev_txid, input.prev_txid <> "#{input.prev_vout}"] end), batch_map <- Enum.into(batch, %{}, fn p -> {p.prev_txid <> "#{p.prev_vout}", p} end), - {:ok, 200, txs} <- RPC.batch_request(:rpc, "getrawtransaction", batch_params), + {:ok, 200, txs} <- RPC.batch_request(:rpc, "getrawtransaction", batch_params, fail_fast), successes <- Enum.filter(txs, fn %{"error" => error} -> error == nil end), rawtxs <- Enum.map(successes, fn tx -> %{"error" => nil, "id" => input_id, "result" => hextx} = tx; rawtx = Base.decode16!(hextx, case: :lower); [input_id, rawtx] end), decoded <- Enum.map(rawtxs, fn [input_id, rawtx] -> {:ok, txn} = decode(rawtx); [input_id, txn] end), @@ -147,56 +152,57 @@ defmodule BitcoinStream.Protocol.Transaction do {:failed, outputs, total} end else - {:ok, 500, reason} -> - Logger.error("input in batch not found"); - Logger.error("#{inspect(reason)}") - {:error, reason} -> - Logger.error("Failed to inflate batched inputs:"); - Logger.error("#{inspect(reason)}") - :error - err -> - Logger.error("Failed to inflate batched inputs: (unknown reason)"); - Logger.error("#{inspect(err)}") + _ -> :error end + + catch + err -> + Logger.error("unexpected error inflating batch"); + IO.inspect(err); + :error end - defp inflate_inputs([], inflated, total) do + defp inflate_inputs([], inflated, total, _fail_fast) do {:ok, inflated, total} end - defp inflate_inputs([next_chunk | rest], inflated, total) do - case inflate_batch(next_chunk) do + defp inflate_inputs([next_chunk | rest], inflated, total, fail_fast) do + case inflate_batch(next_chunk, fail_fast) do {:ok, inflated_chunk, chunk_total} -> - inflate_inputs(rest, inflated ++ inflated_chunk, total + chunk_total) + inflate_inputs(rest, inflated ++ inflated_chunk, total + chunk_total, fail_fast) _ -> {:failed, inflated ++ next_chunk ++ rest, 0} end end - def inflate_inputs([], nil) do + def inflate_inputs([], nil, _fail_fast) do { :failed, nil, 0 } end # Retrieves cached inputs if available, # otherwise inflates inputs in batches of up to 100 - def inflate_inputs(txid, inputs) do + def inflate_inputs(txid, inputs, fail_fast) do case :ets.lookup(:mempool_cache, txid) do # cache miss, actually inflate [] -> - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0) + inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) # cache hit, but processed inputs not available [{_, nil, _}] -> - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0) + inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) + + # cache hit, but inputs not inflated + [{_, {_inputs, _total, false}, _}] -> + inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) # cache hit, just return the cached values - [{_, {inputs, total}, _}] -> + [{_, {inputs, total, true}, _}] -> {:ok, inputs, total} other -> Logger.error("unexpected mempool cache response while inflating inputs #{inspect(other)}"); - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0) + inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) end end