Track & serve unconfirmed mempool spends

This commit is contained in:
Mononaut 2022-04-25 11:02:29 -06:00
parent 25d2b6ea2c
commit 707abbc075
2 changed files with 79 additions and 17 deletions

View File

@ -191,6 +191,7 @@ defmodule BitcoinStream.Mempool do
:registered -> :registered ->
with [] <- :ets.lookup(:block_cache, txid) do # double check tx isn't included in the last block with [] <- :ets.lookup(:block_cache, txid) do # double check tx isn't included in the last block
:ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil});
cache_spends(txid, txn.inputs);
get(pid) get(pid)
else else
_ -> _ ->
@ -248,6 +249,7 @@ defmodule BitcoinStream.Mempool do
[{_txid, _, txn}] when txn != nil -> [{_txid, _, txn}] when txn != nil ->
:ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil});
:ets.delete(:sync_cache, txid); :ets.delete(:sync_cache, txid);
cache_spends(txid, txn.inputs);
if do_count do if do_count do
increment(pid); increment(pid);
end end
@ -295,8 +297,10 @@ defmodule BitcoinStream.Mempool do
get(pid) get(pid)
# tx fully processed and not already dropped # tx fully processed and not already dropped
[{_txid, data, _status}] when data != nil -> [{txid, data, _status}] when data != nil ->
:ets.delete(:mempool_cache, txid); :ets.delete(:mempool_cache, txid);
{inputs, _value, _inflated} = data;
uncache_spends(inputs);
decrement(pid); decrement(pid);
get(pid) get(pid)
@ -429,6 +433,7 @@ defmodule BitcoinStream.Mempool do
inflated_txn <- BitcoinTx.inflate(txn, false) do inflated_txn <- BitcoinTx.inflate(txn, false) do
if inflated_txn.inflated do if inflated_txn.inflated do
:ets.insert(:mempool_cache, {txid, { inflated_txn.inputs, inflated_txn.value + inflated_txn.fee, inflated_txn.inflated }, status}); :ets.insert(:mempool_cache, {txid, { inflated_txn.inputs, inflated_txn.value + inflated_txn.fee, inflated_txn.inflated }, status});
cache_spends(txid, inflated_txn.inputs);
Logger.debug("repaired #{repaired} mempool txns #{txid}"); Logger.debug("repaired #{repaired} mempool txns #{txid}");
repaired + 1 repaired + 1
else else
@ -493,4 +498,32 @@ defmodule BitcoinStream.Mempool do
end end
end end
defp cache_spend(txid, index, input) do
:ets.insert(:spend_cache, {[input.prev_txid, input.prev_vout], [txid, index]})
end
defp cache_spends(_txid, _index, []) do
:ok
end
defp cache_spends(txid, index, [input | rest]) do
cache_spend(txid, index, input);
cache_spends(txid, index + 1, rest)
end
defp cache_spends(txid, inputs) do
cache_spends(txid, 0, inputs)
end
defp uncache_spend(input) do
:ets.delete(:spend_cache, [input.prev_txid, input.prev_vout])
end
defp uncache_spends([]) do
:ok
end
defp uncache_spends([input | rest]) do
uncache_spend(input);
uncache_spends(rest)
end
defp uncache_spends(inputs) do
uncache_spends(inputs)
end
end end

View File

@ -5,6 +5,7 @@ defmodule BitcoinStream.Index.Spend do
use GenServer use GenServer
alias BitcoinStream.Protocol.Block, as: BitcoinBlock alias BitcoinStream.Protocol.Block, as: BitcoinBlock
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
alias BitcoinStream.RPC, as: RPC alias BitcoinStream.RPC, as: RPC
def start_link(opts) do def start_link(opts) do
@ -15,6 +16,7 @@ defmodule BitcoinStream.Index.Spend do
@impl true @impl true
def init([indexed]) do def init([indexed]) do
:ets.new(:spend_cache, [:set, :public, :named_table]);
if (indexed != nil) do if (indexed != nil) do
{:ok, dbref} = :rocksdb.open(String.to_charlist("data/index/spend"), [create_if_missing: true]); {:ok, dbref} = :rocksdb.open(String.to_charlist("data/index/spend"), [create_if_missing: true]);
Process.send_after(self(), :sync, 2000); Process.send_after(self(), :sync, 2000);
@ -205,7 +207,7 @@ defmodule BitcoinStream.Index.Spend do
defp index_block_inputs(dbref, batch, txns) do defp index_block_inputs(dbref, batch, txns) do
spends = index_txs(txns, %{}); spends = index_txs(txns, %{});
Enum.each(spends, fn {binid, outputs} -> Enum.each(spends, fn {binid, outputs} ->
case get_spends(dbref, binid) do case get_chain_spends(dbref, binid) do
false -> false ->
Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}") Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}")
:ok :ok
@ -294,13 +296,22 @@ defmodule BitcoinStream.Index.Spend do
end end
end end
defp get_spends(dbref, binary_txid) do defp get_chain_spends(dbref, binary_txid) do
case :rocksdb.get(dbref, binary_txid, []) do case :rocksdb.get(dbref, binary_txid, []) do
{:ok, spends} -> {:ok, spends} ->
spends spends
:not_found -> :not_found ->
false # uninitialized, try to construct on-the-fly from RPC data
txid = Base.encode16(binary_txid);
with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]),
rawtx <- Base.decode16!(hextx, case: :lower),
{:ok, tx } <- BitcoinTx.decode(rawtx) do
size = length(tx.outputs) * 35 * 8;
<<0::integer-size(size)>>
else
_ -> false
end
_ -> _ ->
Logger.error("unexpected leveldb response"); Logger.error("unexpected leveldb response");
@ -308,16 +319,19 @@ defmodule BitcoinStream.Index.Spend do
end end
end end
defp unpack_spends(<<>>, spend_array) do defp unpack_spends(<<>>, spend_list) do
Enum.reverse(spend_array) Enum.reverse(spend_list)
end end
# unspent outputs are zeroed out # unspent outputs are zeroed out
defp unpack_spends(<<0::integer-size(280), rest::binary>>, spend_array) do defp unpack_spends(<<0::integer-size(280), rest::binary>>, spend_list) do
unpack_spends(rest, [false | spend_array]) unpack_spends(rest, [false | spend_list])
end end
defp unpack_spends(<<binary_txid::binary-size(32), index::integer-size(24), rest::binary>>, spend_array) do defp unpack_spends(<<binary_txid::binary-size(32), index::integer-size(24), rest::binary>>, spend_list) do
txid = Base.encode16(binary_txid, [case: :lower]); txid = Base.encode16(binary_txid, [case: :lower]);
unpack_spends(rest, [[txid, index] | spend_array]) unpack_spends(rest, [[txid, index] | spend_list])
end
defp unpack_spends(false) do
[]
end end
defp unpack_spends(bin) do defp unpack_spends(bin) do
unpack_spends(bin, []) unpack_spends(bin, [])
@ -325,15 +339,30 @@ defmodule BitcoinStream.Index.Spend do
defp get_transaction_spends(dbref, txid) do defp get_transaction_spends(dbref, txid) do
binary_txid = Base.decode16!(txid, [case: :lower]); binary_txid = Base.decode16!(txid, [case: :lower]);
case get_spends(dbref, binary_txid) do chain_spends = get_chain_spends(dbref, binary_txid);
false -> spend_list = unpack_spends(chain_spends);
{:ok, nil} spend_list = add_mempool_spends(txid, spend_list);
{:ok, spend_list}
end
spends -> defp add_mempool_spends(_txid, _index, [], added) do
spend_array = unpack_spends(spends); Enum.reverse(added)
{:ok, spend_array} end
defp add_mempool_spends(txid, index, [false | rest], added) do
case :ets.lookup(:spend_cache, [txid, index]) do
[] ->
add_mempool_spends(txid, index + 1, rest, [false | added])
[{[_index, _txid], spend}] ->
add_mempool_spends(txid, index + 1, rest, [spend | added])
end end
end end
defp add_mempool_spends(txid, index, [spend | rest], added) do
add_mempool_spends(txid, index + 1, rest, [spend | added])
end
defp add_mempool_spends(txid, spend_list) do
add_mempool_spends(txid, 0, spend_list, [])
end
defp stack_dropped_blocks(dbref, hash, undo_stack, min_height) do defp stack_dropped_blocks(dbref, hash, undo_stack, min_height) do
# while we're below the latest processed height # while we're below the latest processed height
@ -351,7 +380,7 @@ defmodule BitcoinStream.Index.Spend do
defp drop_block_inputs(dbref, batch, txns) do defp drop_block_inputs(dbref, batch, txns) do
spends = index_txs(txns, %{}); spends = index_txs(txns, %{});
Enum.each(spends, fn {binid, outputs} -> Enum.each(spends, fn {binid, outputs} ->
case get_spends(dbref, binid) do case get_chain_spends(dbref, binid) do
false -> false ->
Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}") Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}")
:ok :ok