Skip prevout expansion in mempool sync

This commit is contained in:
Mononaut 2022-05-17 19:36:55 +00:00
parent 0a35207dd7
commit b689b3dd55
10 changed files with 125 additions and 162 deletions

View File

@ -120,7 +120,7 @@ $: {
} }
} else inputs = [] } else inputs = []
if ($detailTx && $detailTx.outputs) { if ($detailTx && $detailTx.outputs) {
if ($detailTx.isCoinbase || !$detailTx.is_inflated || !$detailTx.fee) { if ($detailTx.isCoinbase || $detailTx.fee == null) {
outputs = expandAddresses($detailTx.outputs, truncate) outputs = expandAddresses($detailTx.outputs, truncate)
} else { } else {
outputs = [{address: 'fee', value: $detailTx.fee, fee: true}, ...expandAddresses($detailTx.outputs, truncate)] outputs = [{address: 'fee', value: $detailTx.fee, fee: true}, ...expandAddresses($detailTx.outputs, truncate)]
@ -660,7 +660,7 @@ async function goToBlock(e) {
</div> </div>
</div> </div>
{:else} {:else}
{#if $detailTx.is_inflated && $detailTx.fee != null && $detailTx.feerate != null} {#if $detailTx.fee != null && $detailTx.feerate != null}
<div class="pane fields"> <div class="pane fields">
<div class="field"> <div class="field">
<span class="label">fee</span> <span class="label">fee</span>

View File

@ -214,7 +214,7 @@ export default class TxController {
for (let i = 0; i < block.txns.length; i++) { for (let i = 0; i < block.txns.length; i++) {
if (this.poolScene.remove(block.txns[i].id)) { if (this.poolScene.remove(block.txns[i].id)) {
knownCount++ knownCount++
this.txs[block.txns[i].id].setData(block.txns[i]) this.txs[block.txns[i].id].mergeData(block.txns[i])
this.txs[block.txns[i].id].setBlock(block) this.txs[block.txns[i].id].setBlock(block)
this.blockScene.insert(this.txs[block.txns[i].id], 0, false) this.blockScene.insert(this.txs[block.txns[i].id], 0, false)
} else { } else {

View File

@ -1,6 +1,7 @@
import { serverConnected, serverDelay, lastBlockId } from '../stores.js' import { serverConnected, serverDelay, lastBlockId } from '../stores.js'
import config from '../config.js' import config from '../config.js'
import api from '../utils/api.js' import api from '../utils/api.js'
import { fetchBlockByHash } from '../utils/search.js'
let mempoolTimer let mempoolTimer
let lastBlockSeen let lastBlockSeen
@ -121,17 +122,8 @@ class TxStream {
async fetchBlock (id, calledOnLoad) { async fetchBlock (id, calledOnLoad) {
if (!id) return if (!id) return
if (id !== lastBlockSeen) { if (id !== lastBlockSeen) {
try { const blockData = await fetchBlockByHash(id)
console.log('downloading block', id) window.dispatchEvent(new CustomEvent('bitcoin_block', { detail: { block: blockData, realtime: !calledOnLoad} }))
const response = await fetch(`${api.uri}/api/block/${id}`, {
method: 'GET'
})
let blockData = await response.json()
console.log('downloaded block', id)
window.dispatchEvent(new CustomEvent('bitcoin_block', { detail: { block: blockData, realtime: !calledOnLoad} }))
} catch (err) {
console.log("failed to download block ", id)
}
} else { } else {
console.log('already seen block ', lastBlockSeen) console.log('already seen block ', lastBlockSeen)
} }

View File

@ -49,6 +49,23 @@ export default class BitcoinTx {
this.view = new TxView(this) this.view = new TxView(this)
} }
mergeData ({ version, inflated, preview, id, value, fee, vbytes, numInputs, inputs, outputs, time, block }, isCoinbase=false) {
this.setData({
version,
inflated: this.is_inflated || inflated,
preview: this.is_preview && preview,
id,
value,
fee: this.fee || fee,
vbytes,
numInputs: this.numInputs || numInputs,
inputs: this.inputs,
outputs: this.outputs,
time,
block
})
}
setData ({ version, inflated, preview, id, value, fee, vbytes, numInputs, inputs, outputs, time, block }, isCoinbase=false) { setData ({ version, inflated, preview, id, value, fee, vbytes, numInputs, inputs, outputs, time, block }, isCoinbase=false) {
this.version = version this.version = version
this.is_inflated = !!inflated this.is_inflated = !!inflated

View File

@ -140,19 +140,31 @@ async function fetchTx (txid) {
async function fetchBlockByHash (hash) { async function fetchBlockByHash (hash) {
if (!hash || (currentBlockVal && hash === currentBlockVal.id)) return true if (!hash || (currentBlockVal && hash === currentBlockVal.id)) return true
// try to fetch static block // try to fetch static block
console.log('downloading block', hash)
let response = await fetch(`${api.uri}/api/block/${hash}`, { let response = await fetch(`${api.uri}/api/block/${hash}`, {
method: 'GET' method: 'GET'
}) })
if (!response) throw new Error('null response') if (!response) {
console.log('failed to download block', hash)
throw new Error('null response')
}
if (response && response.status == 200) { if (response && response.status == 200) {
const blockData = await response.json() const blockData = await response.json()
let block
if (blockData) { if (blockData) {
if (blockData.id) { if (blockData.id) {
return new BitcoinBlock(blockData) block = new BitcoinBlock(blockData)
} else return BitcoinBlock.decompress(blockData) } else block = BitcoinBlock.decompress(blockData)
} }
if (block && block.id) {
console.log('downloaded block', block.id)
} else {
console.log('failed to download block', block.id)
}
return block
} }
} }
export {fetchBlockByHash as fetchBlockByHash}
async function fetchBlockByHeight (height) { async function fetchBlockByHeight (height) {
if (height == null) return if (height == null) return

View File

@ -366,97 +366,88 @@ defmodule BitcoinStream.Mempool do
end end
defp sync_mempool(pid, txns) do defp sync_mempool(pid, txns) do
sync_mempool_txns(pid, txns, 0) sync_mempool_txns(pid, txns)
end end
#
# defp sync_mempool_txn(pid, txid) do
# case :ets.lookup(:mempool_cache, txid) do
# [] ->
# with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]),
# rawtx <- Base.decode16!(hextx, case: :lower),
# {:ok, txn } <- BitcoinTx.decode(rawtx) do
# register(pid, txid, nil, false);
# insert(pid, txid, txn)
# else
# _ -> Logger.debug("sync_mempool_txn failed #{txid}")
# end
#
# [_] -> true
# end
# end
#
# defp sync_mempool_txns(_, [], count) do
# count
# end
#
# defp sync_mempool_txns(pid, [head | tail], count) do
# Logger.debug("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}");
# sync_mempool_txn(pid, head);
# sync_mempool_txns(pid, tail, count + 1)
# end
defp sync_mempool_txn(pid, txid) do
case :ets.lookup(:mempool_cache, txid) do defp sync_batch(pid, batch) do
[] -> with batch_params <- Enum.map(batch, fn txid -> [txid, txid] end),
with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]), {:ok, 200, txs} <- RPC.batch_request(:rpc, "getrawtransaction", batch_params, true),
rawtx <- Base.decode16!(hextx, case: :lower), failures <- Enum.filter(txs, fn %{"error" => error} -> error != nil end),
{:ok, txn } <- BitcoinTx.decode(rawtx), successes <- Enum.filter(txs, fn %{"error" => error} -> error == nil end) do
inflated_txn <- BitcoinTx.inflate(txn, false) do Enum.each(successes, fn tx ->
register(pid, txid, nil, false); with %{"error" => nil, "id" => _txid, "result" => hextx} <- tx,
insert(pid, txid, inflated_txn) rawtx <- Base.decode16!(hextx, case: :lower),
else {:ok, txn } <- BitcoinTx.decode(rawtx) do
_ -> Logger.debug("sync_mempool_txn failed #{txid}") register(pid, txn.id, nil, false);
insert(pid, txn.id, txn)
end
end);
case length(failures) do
count when count > 0 ->
IO.puts("failures: #{length(failures)}")
_ -> false
end end
{:ok, length(successes)}
[_] -> true
end
end
defp sync_mempool_txns(_, [], count) do
count
end
defp sync_mempool_txns(pid, [head | tail], count) do
Logger.debug("Syncing mempool tx #{count}/#{count + length(tail) + 1} | #{head}");
sync_mempool_txn(pid, head);
sync_mempool_txns(pid, tail, count + 1)
end
# when transaction inflation fails, we fall back to storing deflated inputs in the cache
# the repair function scans the mempool cache for deflated inputs, and attempts to reinflate
def repair(_pid) do
Logger.debug("Checking mempool integrity");
repaired = :ets.foldl(&(repair_mempool_txn/2), 0, :mempool_cache);
if repaired > 0 do
Logger.info("MEMPOOL CHECK COMPLETE #{repaired} REPAIRED");
else else
Logger.debug("MEMPOOL REPAIR NOT REQUIRED"); _ ->
:error
end end
:ok
catch catch
err -> err ->
Logger.error("Failed to repair mempool: #{inspect(err)}"); Logger.error("unexpected error syncing batch");
IO.inspect(err);
:error :error
end end
defp repair_mempool_txn(entry, repaired) do defp sync_mempool_txns(_pid, [], count) do
case entry do {:ok, count}
# unprocessed
{_, nil, _} ->
repaired
# valid entry, already inflated
{_txid, {_inputs, _total, true}, _} ->
repaired
# valid entry, not inflated
# repair
{txid, {_inputs, _total, false}, status} ->
Logger.debug("repairing #{txid}");
with {:ok, 200, hextx} <- RPC.request(:rpc, "getrawtransaction", [txid]),
rawtx <- Base.decode16!(hextx, case: :lower),
{:ok, txn } <- BitcoinTx.decode(rawtx),
inflated_txn <- BitcoinTx.inflate(txn, false) do
if inflated_txn.inflated do
:ets.insert(:mempool_cache, {txid, { inflated_txn.inputs, inflated_txn.value + inflated_txn.fee, inflated_txn.inflated }, status});
cache_spends(txid, inflated_txn.inputs);
Logger.debug("repaired #{repaired} mempool txns #{txid}");
repaired + 1
else
Logger.debug("failed to inflate transaction for repair #{txid}");
repaired
end
else
_ -> Logger.debug("failed to fetch transaction for repair #{txid}");
repaired
end
# catch all
other ->
Logger.error("unexpected cache entry: #{inspect(other)}");
repaired
end
catch
err ->
Logger.debug("unexpected error repairing transaction: #{inspect(err)}");
repaired
end end
defp sync_mempool_txns(pid, [next_chunk | rest], count) do
case sync_batch(pid, next_chunk) do
{:ok, batch_count} ->
IO.puts("synced #{batch_count + count} mempool transactions");
sync_mempool_txns(pid, rest, batch_count + count)
_ ->
:failed
end
end
def sync_mempool_txns(pid, txns) do
sync_mempool_txns(pid, Enum.chunk_every(txns, 100), 0)
end
defp cache_sync_ids(pid, txns) do defp cache_sync_ids(pid, txns) do
:ets.delete_all_objects(:sync_cache); :ets.delete_all_objects(:sync_cache);
cache_sync_ids(pid, txns, 0) cache_sync_ids(pid, txns, 0)

View File

@ -83,9 +83,6 @@ defmodule BitcoinStream.Mempool.Sync do
Logger.debug("updated to #{newcount}"); Logger.debug("updated to #{newcount}");
end end
# repair transactions with deflated inputs
Mempool.repair(:mempool);
# next check in 1 minute # next check in 1 minute
Process.send_after(self(), :resync, 60 * 1000) Process.send_after(self(), :resync, 60 * 1000)
else else

View File

@ -11,7 +11,6 @@ defmodule BitcoinStream.Protocol.Block do
""" """
alias BitcoinStream.Protocol.Transaction, as: BitcoinTx alias BitcoinStream.Protocol.Transaction, as: BitcoinTx
alias BitcoinStream.Mempool, as: Mempool
@derive Jason.Encoder @derive Jason.Encoder
defstruct [ defstruct [
@ -24,7 +23,6 @@ defstruct [
:nonce, :nonce,
:txn_count, :txn_count,
:txns, :txns,
:fees,
:value, :value,
:id :id
] ]
@ -34,7 +32,7 @@ def decode(block_binary) do
hex <- Base.encode16(block_binary, case: :lower), hex <- Base.encode16(block_binary, case: :lower),
{:ok, raw_block} <- Bitcoinex.Block.decode(hex), {:ok, raw_block} <- Bitcoinex.Block.decode(hex),
id <- Bitcoinex.Block.block_id(block_binary), id <- Bitcoinex.Block.block_id(block_binary),
{summarised_txns, total_value, total_fees} <- summarise_txns(raw_block.txns) {summarised_txns, total_value} <- summarise_txns(raw_block.txns)
do do
{:ok, %__MODULE__{ {:ok, %__MODULE__{
version: raw_block.version, version: raw_block.version,
@ -45,7 +43,6 @@ def decode(block_binary) do
bytes: bytes, bytes: bytes,
txn_count: raw_block.txn_count, txn_count: raw_block.txn_count,
txns: summarised_txns, txns: summarised_txns,
fees: total_fees,
value: total_value, value: total_value,
id: id id: id
}} }}
@ -64,7 +61,7 @@ def parse(hex) do
bytes <- byte_size(block_binary), bytes <- byte_size(block_binary),
{:ok, raw_block} <- Bitcoinex.Block.decode(hex), {:ok, raw_block} <- Bitcoinex.Block.decode(hex),
id <- Bitcoinex.Block.block_id(block_binary), id <- Bitcoinex.Block.block_id(block_binary),
{summarised_txns, total_value, total_fees} <- summarise_txns(raw_block.txns) {summarised_txns, total_value} <- summarise_txns(raw_block.txns)
do do
{:ok, %__MODULE__{ {:ok, %__MODULE__{
version: raw_block.version, version: raw_block.version,
@ -75,7 +72,6 @@ def parse(hex) do
bytes: bytes, bytes: bytes,
txn_count: raw_block.txn_count, txn_count: raw_block.txn_count,
txns: summarised_txns, txns: summarised_txns,
fees: total_fees,
value: total_value, value: total_value,
id: id id: id
}} }}
@ -92,8 +88,8 @@ end
defp summarise_txns([coinbase | txns]) do defp summarise_txns([coinbase | txns]) do
# Mempool.is_done returns false while the mempool is still syncing # Mempool.is_done returns false while the mempool is still syncing
with extended_coinbase <- BitcoinTx.extend(coinbase), with extended_coinbase <- BitcoinTx.extend(coinbase),
{summarised, total, fees} <- summarise_txns(txns, [], 0, 0, Mempool.is_done(:mempool)) do {summarised, total} <- summarise_txns(txns, [], 0) do
{[extended_coinbase | summarised], total + extended_coinbase.value, fees} {[extended_coinbase | summarised], total + extended_coinbase.value}
else else
err -> err ->
Logger.error("Failed to inflate block"); Logger.error("Failed to inflate block");
@ -102,29 +98,13 @@ defp summarise_txns([coinbase | txns]) do
end end
end end
defp summarise_txns([], summarised, total, fees, do_inflate) do defp summarise_txns([], summarised, total) do
if do_inflate do {Enum.reverse(summarised), total}
{Enum.reverse(summarised), total, fees}
else
{Enum.reverse(summarised), total, nil}
end
end end
defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do defp summarise_txns([next | rest], summarised, total) do
extended_txn = BitcoinTx.extend(next) extended_txn = BitcoinTx.extend(next);
summarise_txns(rest, [extended_txn | summarised], total + extended_txn.value)
# if the mempool is still syncing, inflating txs will take too long, so skip it
if do_inflate do
inflated_txn = BitcoinTx.inflate(extended_txn, false)
if (inflated_txn.inflated) do
# Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{extended_txn.id}");
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true)
else
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false)
end
else
summarise_txns(rest, [extended_txn | summarised], total + extended_txn.value, nil, false)
end
end end
end end

View File

@ -183,27 +183,7 @@ defmodule BitcoinStream.Protocol.Transaction do
# Retrieves cached inputs if available, # Retrieves cached inputs if available,
# otherwise inflates inputs in batches of up to 100 # otherwise inflates inputs in batches of up to 100
def inflate_inputs(txid, inputs, fail_fast) do def inflate_inputs(txid, inputs, fail_fast) do
case :ets.lookup(:mempool_cache, txid) do inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast)
# cache miss, actually inflate
[] ->
inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast)
# cache hit, but processed inputs not available
[{_, nil, _}] ->
inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast)
# cache hit, but inputs not inflated
[{_, {_inputs, _total, false}, _}] ->
inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast)
# cache hit, just return the cached values
[{_, {inputs, total, true}, _}] ->
{:ok, inputs, total}
other ->
Logger.error("unexpected mempool cache response while inflating inputs #{inspect(other)}");
inflate_inputs(Enum.chunk_every(inputs, 100), [], 0, fail_fast)
end
end end
end end

View File

@ -86,20 +86,14 @@ defmodule BitcoinStream.Router do
end end
defp get_block(hash) do defp get_block(hash) do
last_id = BlockData.get_block_id(:block_data); with {:ok, 200, block} <- RPC.request(:rpc, "getblock", [hash, 2]),
if hash == last_id do {:ok, cleaned} <- BlockData.clean_block(block),
payload = BlockData.get_json_block(:block_data); {:ok, payload} <- Jason.encode(cleaned) do
{:ok, payload, true} {:ok, payload, false}
else else
with {:ok, 200, block} <- RPC.request(:rpc, "getblock", [hash, 2]), err ->
{:ok, cleaned} <- BlockData.clean_block(block), IO.inspect(err);
{:ok, payload} <- Jason.encode(cleaned) do :err
{:ok, payload, false}
else
err ->
IO.inspect(err);
:err
end
end end
end end