diff --git a/client/src/components/TransactionOverlay.svelte b/client/src/components/TransactionOverlay.svelte index 21a288b..055d2fe 100644 --- a/client/src/components/TransactionOverlay.svelte +++ b/client/src/components/TransactionOverlay.svelte @@ -120,7 +120,7 @@ $: { } } else inputs = [] if ($detailTx && $detailTx.outputs) { - if ($detailTx.isCoinbase || !$detailTx.is_inflated || !$detailTx.fee) { + if ($detailTx.isCoinbase || $detailTx.fee == null) { outputs = expandAddresses($detailTx.outputs, truncate) } else { outputs = [{address: 'fee', value: $detailTx.fee, fee: true}, ...expandAddresses($detailTx.outputs, truncate)] @@ -660,7 +660,7 @@ async function goToBlock(e) { {:else} - {#if $detailTx.is_inflated && $detailTx.fee != null && $detailTx.feerate != null} + {#if $detailTx.fee != null && $detailTx.feerate != null}
fee diff --git a/client/src/controllers/TxController.js b/client/src/controllers/TxController.js index 67bc83c..2cb6b3f 100644 --- a/client/src/controllers/TxController.js +++ b/client/src/controllers/TxController.js @@ -214,7 +214,7 @@ export default class TxController { for (let i = 0; i < block.txns.length; i++) { if (this.poolScene.remove(block.txns[i].id)) { knownCount++ - this.txs[block.txns[i].id].setData(block.txns[i]) + this.txs[block.txns[i].id].mergeData(block.txns[i]) this.txs[block.txns[i].id].setBlock(block) this.blockScene.insert(this.txs[block.txns[i].id], 0, false) } else { diff --git a/client/src/controllers/TxStream.js b/client/src/controllers/TxStream.js index cfecdee..c064f54 100644 --- a/client/src/controllers/TxStream.js +++ b/client/src/controllers/TxStream.js @@ -1,6 +1,7 @@ import { serverConnected, serverDelay, lastBlockId } from '../stores.js' import config from '../config.js' import api from '../utils/api.js' +import { fetchBlockByHash } from '../utils/search.js' let mempoolTimer let lastBlockSeen @@ -121,17 +122,8 @@ class TxStream { async fetchBlock (id, calledOnLoad) { if (!id) return if (id !== lastBlockSeen) { - try { - console.log('downloading block', id) - const response = await fetch(`${api.uri}/api/block/${id}`, { - method: 'GET' - }) - let blockData = await response.json() - console.log('downloaded block', id) - window.dispatchEvent(new CustomEvent('bitcoin_block', { detail: { block: blockData, realtime: !calledOnLoad} })) - } catch (err) { - console.log("failed to download block ", id) - } + const blockData = await fetchBlockByHash(id) + window.dispatchEvent(new CustomEvent('bitcoin_block', { detail: { block: blockData, realtime: !calledOnLoad} })) } else { console.log('already seen block ', lastBlockSeen) } diff --git a/client/src/models/BitcoinTx.js b/client/src/models/BitcoinTx.js index 136d9cb..5a340bb 100644 --- a/client/src/models/BitcoinTx.js +++ b/client/src/models/BitcoinTx.js @@ -49,6 +49,23 @@ export default class BitcoinTx { this.view = new TxView(this) } + mergeData ({ version, inflated, preview, id, value, fee, vbytes, numInputs, inputs, outputs, time, block }, isCoinbase=false) { + this.setData({ + version, + inflated: this.is_inflated || inflated, + preview: this.is_preview && preview, + id, + value, + fee: this.fee || fee, + vbytes, + numInputs: this.numInputs || numInputs, + inputs: this.inputs, + outputs: this.outputs, + time, + block + }) + } + setData ({ version, inflated, preview, id, value, fee, vbytes, numInputs, inputs, outputs, time, block }, isCoinbase=false) { this.version = version this.is_inflated = !!inflated diff --git a/client/src/utils/search.js b/client/src/utils/search.js index 4e343fb..cdeb781 100644 --- a/client/src/utils/search.js +++ b/client/src/utils/search.js @@ -140,19 +140,31 @@ async function fetchTx (txid) { async function fetchBlockByHash (hash) { if (!hash || (currentBlockVal && hash === currentBlockVal.id)) return true // try to fetch static block + console.log('downloading block', hash) let response = await fetch(`${api.uri}/api/block/${hash}`, { method: 'GET' }) - if (!response) throw new Error('null response') + if (!response) { + console.log('failed to download block', hash) + throw new Error('null response') + } if (response && response.status == 200) { const blockData = await response.json() + let block if (blockData) { if (blockData.id) { - return new BitcoinBlock(blockData) - } else return BitcoinBlock.decompress(blockData) + block = new BitcoinBlock(blockData) + } else block = BitcoinBlock.decompress(blockData) } + if (block && block.id) { + console.log('downloaded block', block.id) + } else { + console.log('failed to download block', block.id) + } + return block } } +export {fetchBlockByHash as fetchBlockByHash} async function fetchBlockByHeight (height) { if (height == null) return diff --git a/server/lib/mempool.ex b/server/lib/mempool.ex index 8c9ecdc..633c03e 100644 --- a/server/lib/mempool.ex +++ b/server/lib/mempool.ex @@ -366,97 +366,88 @@ defmodule BitcoinStream.Mempool do end defp sync_mempool(pid, txns) do - sync_mempool_txns(pid, txns, 0) + sync_mempool_txns(pid, txns) end + # + # defp sync_mempool_txn(pid, txid) do + # case :ets.lookup(:mempool_cache, txid) do + # [] -> + # with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), + # rawtx <- Base.decode16!(hextx, case: :lower), + # {:ok, txn } <- BitcoinTx.decode(rawtx) do + # register(pid, txid, nil, false); + # insert(pid, txid, txn) + # else + # _ -> Logger.debug("sync_mempool_txn failed #{txid}") + # end + # + # [_] -> true + # end + # end + # + # defp sync_mempool_txns(_, [], count) do + # count + # end + # + # defp sync_mempool_txns(pid, [head | tail], count) do + # Logger.debug("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}"); + # sync_mempool_txn(pid, head); + # sync_mempool_txns(pid, tail, count + 1) + # end - defp sync_mempool_txn(pid, txid) do - case :ets.lookup(:mempool_cache, txid) do - [] -> - with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), - rawtx <- Base.decode16!(hextx, case: :lower), - {:ok, txn } <- BitcoinTx.decode(rawtx), - inflated_txn <- BitcoinTx.inflate(txn, false) do - register(pid, txid, nil, false); - insert(pid, txid, inflated_txn) - else - _ -> Logger.debug("sync_mempool_txn failed #{txid}") + + defp sync_batch(pid, batch) do + with batch_params <- Enum.map(batch, fn txid -> [txid, txid] end), + {:ok, 200, txs} <- RPC.batch_request(:rpc, "getrawtransaction", batch_params, true), + failures <- Enum.filter(txs, fn %{"error" => error} -> error != nil end), + successes <- Enum.filter(txs, fn %{"error" => error} -> error == nil end) do + Enum.each(successes, fn tx -> + with %{"error" => nil, "id" => _txid, "result" => hextx} <- tx, + rawtx <- Base.decode16!(hextx, case: :lower), + {:ok, txn } <- BitcoinTx.decode(rawtx) do + register(pid, txn.id, nil, false); + insert(pid, txn.id, txn) + end + end); + case length(failures) do + count when count > 0 -> + IO.puts("failures: #{length(failures)}") + + _ -> false end - - [_] -> true - end - end - - defp sync_mempool_txns(_, [], count) do - count - end - - defp sync_mempool_txns(pid, [head | tail], count) do - Logger.debug("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}"); - sync_mempool_txn(pid, head); - sync_mempool_txns(pid, tail, count + 1) - end - - # when transaction inflation fails, we fall back to storing deflated inputs in the cache - # the repair function scans the mempool cache for deflated inputs, and attempts to reinflate - def repair(_pid) do - Logger.debug("Checking mempool integrity"); - repaired = :ets.foldl(&(repair_mempool_txn/2), 0, :mempool_cache); - if repaired > 0 do - Logger.info("MEMPOOL CHECK COMPLETE #{repaired} REPAIRED"); + {:ok, length(successes)} else - Logger.debug("MEMPOOL REPAIR NOT REQUIRED"); + _ -> + :error end - :ok + catch err -> - Logger.error("Failed to repair mempool: #{inspect(err)}"); + Logger.error("unexpected error syncing batch"); + IO.inspect(err); :error end - defp repair_mempool_txn(entry, repaired) do - case entry do - # unprocessed - {_, nil, _} -> - repaired - - # valid entry, already inflated - {_txid, {_inputs, _total, true}, _} -> - repaired - - # valid entry, not inflated - # repair - {txid, {_inputs, _total, false}, status} -> - Logger.debug("repairing #{txid}"); - with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), - rawtx <- Base.decode16!(hextx, case: :lower), - {:ok, txn } <- BitcoinTx.decode(rawtx), - inflated_txn <- BitcoinTx.inflate(txn, false) do - if inflated_txn.inflated do - :ets.insert(:mempool_cache, {txid, { inflated_txn.inputs, inflated_txn.value + inflated_txn.fee, inflated_txn.inflated }, status}); - cache_spends(txid, inflated_txn.inputs); - Logger.debug("repaired #{repaired} mempool txns #{txid}"); - repaired + 1 - else - Logger.debug("failed to inflate transaction for repair #{txid}"); - repaired - end - - else - _ -> Logger.debug("failed to fetch transaction for repair #{txid}"); - repaired - end - - # catch all - other -> - Logger.error("unexpected cache entry: #{inspect(other)}"); - repaired - end - catch - err -> - Logger.debug("unexpected error repairing transaction: #{inspect(err)}"); - repaired + defp sync_mempool_txns(_pid, [], count) do + {:ok, count} end + defp sync_mempool_txns(pid, [next_chunk | rest], count) do + case sync_batch(pid, next_chunk) do + {:ok, batch_count} -> + IO.puts("synced #{batch_count + count} mempool transactions"); + sync_mempool_txns(pid, rest, batch_count + count) + + _ -> + :failed + end + end + + def sync_mempool_txns(pid, txns) do + sync_mempool_txns(pid, Enum.chunk_every(txns, 100), 0) + end + + defp cache_sync_ids(pid, txns) do :ets.delete_all_objects(:sync_cache); cache_sync_ids(pid, txns, 0) diff --git a/server/lib/mempool_sync.ex b/server/lib/mempool_sync.ex index b8c7a12..57d0e69 100644 --- a/server/lib/mempool_sync.ex +++ b/server/lib/mempool_sync.ex @@ -83,9 +83,6 @@ defmodule BitcoinStream.Mempool.Sync do Logger.debug("updated to #{newcount}"); end - # repair transactions with deflated inputs - Mempool.repair(:mempool); - # next check in 1 minute Process.send_after(self(), :resync, 60 * 1000) else diff --git a/server/lib/protocol/block.ex b/server/lib/protocol/block.ex index 84a24aa..d27544c 100644 --- a/server/lib/protocol/block.ex +++ b/server/lib/protocol/block.ex @@ -11,7 +11,6 @@ defmodule BitcoinStream.Protocol.Block do """ alias BitcoinStream.Protocol.Transaction, as: BitcoinTx -alias BitcoinStream.Mempool, as: Mempool @derive Jason.Encoder defstruct [ @@ -24,7 +23,6 @@ defstruct [ :nonce, :txn_count, :txns, - :fees, :value, :id ] @@ -34,7 +32,7 @@ def decode(block_binary) do hex <- Base.encode16(block_binary, case: :lower), {:ok, raw_block} <- Bitcoinex.Block.decode(hex), id <- Bitcoinex.Block.block_id(block_binary), - {summarised_txns, total_value, total_fees} <- summarise_txns(raw_block.txns) + {summarised_txns, total_value} <- summarise_txns(raw_block.txns) do {:ok, %__MODULE__{ version: raw_block.version, @@ -45,7 +43,6 @@ def decode(block_binary) do bytes: bytes, txn_count: raw_block.txn_count, txns: summarised_txns, - fees: total_fees, value: total_value, id: id }} @@ -64,7 +61,7 @@ def parse(hex) do bytes <- byte_size(block_binary), {:ok, raw_block} <- Bitcoinex.Block.decode(hex), id <- Bitcoinex.Block.block_id(block_binary), - {summarised_txns, total_value, total_fees} <- summarise_txns(raw_block.txns) + {summarised_txns, total_value} <- summarise_txns(raw_block.txns) do {:ok, %__MODULE__{ version: raw_block.version, @@ -75,7 +72,6 @@ def parse(hex) do bytes: bytes, txn_count: raw_block.txn_count, txns: summarised_txns, - fees: total_fees, value: total_value, id: id }} @@ -92,8 +88,8 @@ end defp summarise_txns([coinbase | txns]) do # Mempool.is_done returns false while the mempool is still syncing with extended_coinbase <- BitcoinTx.extend(coinbase), - {summarised, total, fees} <- summarise_txns(txns, [], 0, 0, Mempool.is_done(:mempool)) do - {[extended_coinbase | summarised], total + extended_coinbase.value, fees} + {summarised, total} <- summarise_txns(txns, [], 0) do + {[extended_coinbase | summarised], total + extended_coinbase.value} else err -> Logger.error("Failed to inflate block"); @@ -102,29 +98,13 @@ defp summarise_txns([coinbase | txns]) do end end -defp summarise_txns([], summarised, total, fees, do_inflate) do - if do_inflate do - {Enum.reverse(summarised), total, fees} - else - {Enum.reverse(summarised), total, nil} - end +defp summarise_txns([], summarised, total) do + {Enum.reverse(summarised), total} end -defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do - extended_txn = BitcoinTx.extend(next) - - # if the mempool is still syncing, inflating txs will take too long, so skip it - if do_inflate do - inflated_txn = BitcoinTx.inflate(extended_txn, false) - if (inflated_txn.inflated) do - # Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{extended_txn.id}"); - summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true) - else - summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false) - end - else - summarise_txns(rest, [extended_txn | summarised], total + extended_txn.value, nil, false) - end +defp summarise_txns([next | rest], summarised, total) do + extended_txn = BitcoinTx.extend(next); + summarise_txns(rest, [extended_txn | summarised], total + extended_txn.value) end end diff --git a/server/lib/protocol/transaction.ex b/server/lib/protocol/transaction.ex index 5328afb..20b817e 100644 --- a/server/lib/protocol/transaction.ex +++ b/server/lib/protocol/transaction.ex @@ -183,27 +183,7 @@ defmodule BitcoinStream.Protocol.Transaction do # Retrieves cached inputs if available, # otherwise inflates inputs in batches of up to 100 def inflate_inputs(txid, inputs, fail_fast) do - case :ets.lookup(:mempool_cache, txid) do - # cache miss, actually inflate - [] -> - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) - - # cache hit, but processed inputs not available - [{_, nil, _}] -> - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) - - # cache hit, but inputs not inflated - [{_, {_inputs, _total, false}, _}] -> - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) - - # cache hit, just return the cached values - [{_, {inputs, total, true}, _}] -> - {:ok, inputs, total} - - other -> - Logger.error("unexpected mempool cache response while inflating inputs #{inspect(other)}"); - inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) - end + inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast) end end diff --git a/server/lib/router.ex b/server/lib/router.ex index 492bea8..747e2e9 100644 --- a/server/lib/router.ex +++ b/server/lib/router.ex @@ -86,20 +86,14 @@ defmodule BitcoinStream.Router do end defp get_block(hash) do - last_id = BlockData.get_block_id(:block_data); - if hash == last_id do - payload = BlockData.get_json_block(:block_data); - {:ok, payload, true} + with {:ok, 200, block} <- RPC.request(:rpc, "getblock", [hash, 2]), + {:ok, cleaned} <- BlockData.clean_block(block), + {:ok, payload} <- Jason.encode(cleaned) do + {:ok, payload, false} else - with {:ok, 200, block} <- RPC.request(:rpc, "getblock", [hash, 2]), - {:ok, cleaned} <- BlockData.clean_block(block), - {:ok, payload} <- Jason.encode(cleaned) do - {:ok, payload, false} - else - err -> - IO.inspect(err); - :err - end + err -> + IO.inspect(err); + :err end end