diff --git a/server/lib/mempool.ex b/server/lib/mempool.ex index 6582f4b..4ae7b45 100644 --- a/server/lib/mempool.ex +++ b/server/lib/mempool.ex @@ -191,6 +191,7 @@ defmodule BitcoinStream.Mempool do :registered -> with [] <- :ets.lookup(:block_cache, txid) do # double check tx isn't included in the last block :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); + cache_spends(txid, txn.inputs); get(pid) else _ -> @@ -248,6 +249,7 @@ defmodule BitcoinStream.Mempool do [{_txid, _, txn}] when txn != nil -> :ets.insert(:mempool_cache, {txid, { txn.inputs, txn.value + txn.fee, txn.inflated }, nil}); :ets.delete(:sync_cache, txid); + cache_spends(txid, txn.inputs); if do_count do increment(pid); end @@ -295,8 +297,10 @@ defmodule BitcoinStream.Mempool do get(pid) # tx fully processed and not already dropped - [{_txid, data, _status}] when data != nil -> + [{txid, data, _status}] when data != nil -> :ets.delete(:mempool_cache, txid); + {inputs, _value, _inflated} = data; + uncache_spends(inputs); decrement(pid); get(pid) @@ -429,6 +433,7 @@ defmodule BitcoinStream.Mempool do inflated_txn <- BitcoinTx.inflate(txn, false) do if inflated_txn.inflated do :ets.insert(:mempool_cache, {txid, { inflated_txn.inputs, inflated_txn.value + inflated_txn.fee, inflated_txn.inflated }, status}); + cache_spends(txid, inflated_txn.inputs); Logger.debug("repaired #{repaired} mempool txns #{txid}"); repaired + 1 else @@ -493,4 +498,32 @@ defmodule BitcoinStream.Mempool do end end + + defp cache_spend(txid, index, input) do + :ets.insert(:spend_cache, {[input.prev_txid, input.prev_vout], [txid, index]}) + end + defp cache_spends(_txid, _index, []) do + :ok + end + defp cache_spends(txid, index, [input | rest]) do + cache_spend(txid, index, input); + cache_spends(txid, index + 1, rest) + end + defp cache_spends(txid, inputs) do + cache_spends(txid, 0, inputs) + end + + defp uncache_spend(input) do + :ets.delete(:spend_cache, [input.prev_txid, input.prev_vout]) + end + defp uncache_spends([]) do + :ok + end + defp uncache_spends([input | rest]) do + uncache_spend(input); + uncache_spends(rest) + end + defp uncache_spends(inputs) do + uncache_spends(inputs) + end end diff --git a/server/lib/spend_index.ex b/server/lib/spend_index.ex index 33c2727..0c11095 100644 --- a/server/lib/spend_index.ex +++ b/server/lib/spend_index.ex @@ -5,6 +5,7 @@ defmodule BitcoinStream.Index.Spend do use GenServer alias BitcoinStream.Protocol.Block, as: BitcoinBlock + alias BitcoinStream.Protocol.Transaction, as: BitcoinTx alias BitcoinStream.RPC, as: RPC def start_link(opts) do @@ -15,6 +16,7 @@ defmodule BitcoinStream.Index.Spend do @impl true def init([indexed]) do + :ets.new(:spend_cache, [:set, :public, :named_table]); if (indexed != nil) do {:ok, dbref} = :rocksdb.open(String.to_charlist("data/index/spend"), [create_if_missing: true]); Process.send_after(self(), :sync, 2000); @@ -205,7 +207,7 @@ defmodule BitcoinStream.Index.Spend do defp index_block_inputs(dbref, batch, txns) do spends = index_txs(txns, %{}); Enum.each(spends, fn {binid, outputs} -> - case get_spends(dbref, binid) do + case get_chain_spends(dbref, binid) do false -> Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}") :ok @@ -294,13 +296,22 @@ defmodule BitcoinStream.Index.Spend do end end - defp get_spends(dbref, binary_txid) do + defp get_chain_spends(dbref, binary_txid) do case :rocksdb.get(dbref, binary_txid, []) do {:ok, spends} -> spends :not_found -> - false + # uninitialized, try to construct on-the-fly from RPC data + txid = Base.encode16(binary_txid); + with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), + rawtx <- Base.decode16!(hextx, case: :lower), + {:ok, tx } <- BitcoinTx.decode(rawtx) do + size = length(tx.outputs) * 35 * 8; + <<0::integer-size(size)>> + else + _ -> false + end _ -> Logger.error("unexpected leveldb response"); @@ -308,16 +319,19 @@ defmodule BitcoinStream.Index.Spend do end end - defp unpack_spends(<<>>, spend_array) do - Enum.reverse(spend_array) + defp unpack_spends(<<>>, spend_list) do + Enum.reverse(spend_list) end # unspent outputs are zeroed out - defp unpack_spends(<<0::integer-size(280), rest::binary>>, spend_array) do - unpack_spends(rest, [false | spend_array]) + defp unpack_spends(<<0::integer-size(280), rest::binary>>, spend_list) do + unpack_spends(rest, [false | spend_list]) end - defp unpack_spends(<>, spend_array) do + defp unpack_spends(<>, spend_list) do txid = Base.encode16(binary_txid, [case: :lower]); - unpack_spends(rest, [[txid, index] | spend_array]) + unpack_spends(rest, [[txid, index] | spend_list]) + end + defp unpack_spends(false) do + [] end defp unpack_spends(bin) do unpack_spends(bin, []) @@ -325,15 +339,30 @@ defmodule BitcoinStream.Index.Spend do defp get_transaction_spends(dbref, txid) do binary_txid = Base.decode16!(txid, [case: :lower]); - case get_spends(dbref, binary_txid) do - false -> - {:ok, nil} + chain_spends = get_chain_spends(dbref, binary_txid); + spend_list = unpack_spends(chain_spends); + spend_list = add_mempool_spends(txid, spend_list); + {:ok, spend_list} + end - spends -> - spend_array = unpack_spends(spends); - {:ok, spend_array} + defp add_mempool_spends(_txid, _index, [], added) do + Enum.reverse(added) + end + defp add_mempool_spends(txid, index, [false | rest], added) do + case :ets.lookup(:spend_cache, [txid, index]) do + [] -> + add_mempool_spends(txid, index + 1, rest, [false | added]) + + [{[_index, _txid], spend}] -> + add_mempool_spends(txid, index + 1, rest, [spend | added]) end end + defp add_mempool_spends(txid, index, [spend | rest], added) do + add_mempool_spends(txid, index + 1, rest, [spend | added]) + end + defp add_mempool_spends(txid, spend_list) do + add_mempool_spends(txid, 0, spend_list, []) + end defp stack_dropped_blocks(dbref, hash, undo_stack, min_height) do # while we're below the latest processed height @@ -351,7 +380,7 @@ defmodule BitcoinStream.Index.Spend do defp drop_block_inputs(dbref, batch, txns) do spends = index_txs(txns, %{}); Enum.each(spends, fn {binid, outputs} -> - case get_spends(dbref, binid) do + case get_chain_spends(dbref, binid) do false -> Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}") :ok