mirror of
https://github.com/Retropex/bitfeed.git
synced 2025-05-12 19:20:46 +02:00
Refactor spend index & api
This commit is contained in:
parent
a80a0c6bce
commit
25d2b6ea2c
@ -107,7 +107,6 @@ $: {
|
||||
|
||||
let inputs = []
|
||||
let outputs = []
|
||||
let expandedOutputs = []
|
||||
$: {
|
||||
if ($detailTx && $detailTx.inputs) {
|
||||
if ($detailTx.isCoinbase) {
|
||||
@ -121,41 +120,11 @@ $: {
|
||||
} else inputs = []
|
||||
if ($detailTx && $detailTx.outputs) {
|
||||
if ($detailTx.isCoinbase || !$detailTx.is_inflated || !$detailTx.fee) {
|
||||
expandedOutputs = expandAddresses($detailTx.outputs, truncate)
|
||||
outputs = expandAddresses($detailTx.outputs, truncate)
|
||||
} else {
|
||||
expandedOutputs = [{address: 'fee', value: $detailTx.fee, fee: true}, ...expandAddresses($detailTx.outputs, truncate)]
|
||||
outputs = [{address: 'fee', value: $detailTx.fee, fee: true}, ...expandAddresses($detailTx.outputs, truncate)]
|
||||
}
|
||||
} else expandedOutputs = []
|
||||
}
|
||||
$: {
|
||||
if (expandedOutputs) {
|
||||
if (spends) {
|
||||
outputs = expandedOutputs.map(output => {
|
||||
if (output.index != null && spends[output.index]) {
|
||||
const spendParts = spends[output.index].split(':')
|
||||
return {
|
||||
...output,
|
||||
spend: {
|
||||
txid: spendParts[0],
|
||||
vin: spendParts[1]
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return {
|
||||
...output,
|
||||
spend: false
|
||||
}
|
||||
}
|
||||
})
|
||||
} else outputs = expandedOutputs
|
||||
}
|
||||
}
|
||||
|
||||
let spends
|
||||
$: {
|
||||
if ($detailTx && $detailTx.id) {
|
||||
fetchSpends($detailTx, truncate)
|
||||
} else spends = null
|
||||
} else outputs = []
|
||||
}
|
||||
|
||||
let highlight = {}
|
||||
@ -172,9 +141,9 @@ $: {
|
||||
let sankeyLines
|
||||
let sankeyHeight
|
||||
$: {
|
||||
if ($detailTx && inputs && expandedOutputs) {
|
||||
sankeyHeight = Math.max(inputs.length, expandedOutputs.length) * rowHeight
|
||||
sankeyLines = calcSankeyLines(inputs, expandedOutputs, $detailTx.fee || null, $detailTx.value, sankeyHeight, svgWidth, flowWeight)
|
||||
if ($detailTx && inputs && outputs) {
|
||||
sankeyHeight = Math.max(inputs.length, outputs.length) * rowHeight
|
||||
sankeyLines = calcSankeyLines(inputs, outputs, $detailTx.fee || null, $detailTx.value, sankeyHeight, svgWidth, flowWeight)
|
||||
}
|
||||
}
|
||||
|
||||
@ -286,17 +255,6 @@ function getMiterOffset (weight, dy, dx) {
|
||||
} else return 0
|
||||
}
|
||||
|
||||
async function fetchSpends (tx, truncated) {
|
||||
const response = await fetch(`${api.uri}/api/spends/${tx.id}/0/${truncated ? Math.min(tx.outputs.length, 100) : tx.outputs.length}`, {
|
||||
method: 'GET'
|
||||
})
|
||||
if (!response) throw new Error('null response')
|
||||
if (response.status == 200) {
|
||||
const result = await response.json()
|
||||
spends = result
|
||||
}
|
||||
}
|
||||
|
||||
async function clickItem (item) {
|
||||
if (item.rest) {
|
||||
truncate = false
|
||||
@ -680,10 +638,10 @@ async function goToOutput(e, output) {
|
||||
{#each inputs as input, index}
|
||||
<div class="entry" class:clickable={input.rest} class:highlight={highlight.in != null && highlight.in === index} on:click={() => clickItem(input)}>
|
||||
{#if input.prev_txid }
|
||||
<a href="/tx/{input.prev_txid}:{input.prev_vout}" on:click={(e) => goToInput(e, input)} class="put-link">
|
||||
<svg class="chevron left" height="1.2em" width="1.2em" viewBox="0 0 512 512">
|
||||
<path d="M 107.628,257.54 327.095,38.078 404,114.989 261.506,257.483 404,399.978 327.086,476.89 Z" class="outline" />
|
||||
</svg>
|
||||
<a href="/tx/{input.prev_txid}:{input.prev_vout}" on:click={(e) => goToInput(e, input)} class="put-link" transition:fade|local={{ duration: 200 }}>
|
||||
<svg class="chevron left" height="1.2em" width="1.2em" viewBox="0 0 512 512">
|
||||
<path d="M 107.628,257.54 327.095,38.078 404,114.989 261.506,257.483 404,399.978 327.086,476.89 Z" class="outline" />
|
||||
</svg>
|
||||
</a>
|
||||
{/if}
|
||||
<p class="address" title={input.title || input.address}><span class="truncatable">{input.address.slice(0,-6)}</span><span class="suffix">{input.address.slice(-6)}</span></p>
|
||||
@ -723,7 +681,7 @@ async function goToOutput(e, output) {
|
||||
{#each outputs as output}
|
||||
<div class="entry" class:clickable={output.rest} class:highlight={highlight.out != null && highlight.out === output.index} on:click={() => clickItem(output)}>
|
||||
{#if output.spend}
|
||||
<a href="/tx/{output.spend.vin}:{output.spend.txid}" on:click={(e) => goToOutput(e, output)} class="put-link">
|
||||
<a href="/tx/{output.spend.vin}:{output.spend.txid}" on:click={(e) => goToOutput(e, output)} class="put-link" transition:fade|local={{ duration: 200 }}>
|
||||
<svg class="chevron right" height="1.2em" width="1.2em" viewBox="0 0 512 512">
|
||||
<path d="M 107.628,257.54 327.095,38.078 404,114.989 261.506,257.483 404,399.978 327.086,476.89 Z" class="outline" />
|
||||
</svg>
|
||||
|
@ -5,7 +5,7 @@ import BitcoinTx from '../models/BitcoinTx.js'
|
||||
import BitcoinBlock from '../models/BitcoinBlock.js'
|
||||
import TxSprite from '../models/TxSprite.js'
|
||||
import { FastVertexArray } from '../utils/memory.js'
|
||||
import { searchTx } from '../utils/search.js'
|
||||
import { searchTx, fetchSpends, addSpends } from '../utils/search.js'
|
||||
import { overlay, txCount, mempoolCount, mempoolScreenHeight, blockVisible, currentBlock, selectedTx, detailTx, blockAreaSize, highlight, colorMode, blocksEnabled, latestBlockHeight, explorerBlockData, blockTransitionDirection, loading } from '../stores.js'
|
||||
import config from "../config.js"
|
||||
|
||||
@ -48,14 +48,12 @@ export default class TxController {
|
||||
this.setColorMode(mode)
|
||||
})
|
||||
explorerBlockData.subscribe(blockData => {
|
||||
console.log('explorerBlock changed: ', blockData)
|
||||
if (blockData) {
|
||||
this.exploreBlock(blockData)
|
||||
} else {
|
||||
this.resumeLatest()
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
getVertexData () {
|
||||
@ -390,19 +388,21 @@ export default class TxController {
|
||||
if (selected) selected.hoverOn()
|
||||
}
|
||||
this.selectedTx = selected
|
||||
selectedTx.set(this.selectedTx)
|
||||
if (sameTx && this.selectedTx) {
|
||||
if (!this.selectedTx.is_inflated) {
|
||||
selectedTx.set(selected)
|
||||
if (sameTx && selected) {
|
||||
if (!selected.is_inflated) {
|
||||
loading.increment()
|
||||
await searchTx(this.selectedTx.id)
|
||||
await searchTx(selected.id)
|
||||
loading.decrement()
|
||||
} else {
|
||||
detailTx.set(this.selectedTx)
|
||||
const spendResult = await fetchSpends(selected.id)
|
||||
if (spendResult) selected = addSpends(selected, spendResult)
|
||||
detailTx.set(selected)
|
||||
overlay.set('tx')
|
||||
}
|
||||
console.log(selected)
|
||||
}
|
||||
console.log(this.selectedTx)
|
||||
this.selectionLocked = !!this.selectedTx && !(this.selectionLocked && sameTx)
|
||||
this.selectionLocked = !!selected && !(this.selectionLocked && sameTx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,6 @@ export default class TxPoolScene {
|
||||
|
||||
this.scrollRateLimitTimer = null
|
||||
this.initialised = true
|
||||
|
||||
if (config.dev) console.log('pool', this)
|
||||
}
|
||||
|
||||
resize ({ width = this.width, height = this.height }) {
|
||||
|
@ -168,10 +168,41 @@ async function fetchBlockByHeight (height) {
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchSpends (txid) {
|
||||
if (txid == null) return
|
||||
const response = await fetch(`${api.uri}/api/spends/${txid}`, {
|
||||
method: 'GET'
|
||||
})
|
||||
if (!response) throw new Error('null response')
|
||||
if (response.status == 200) {
|
||||
return response.json()
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
export {fetchSpends as fetchSpends}
|
||||
|
||||
function addSpends(tx, spends) {
|
||||
tx.outputs.forEach((output, index) => {
|
||||
if (spends[index]) {
|
||||
output.spend = {
|
||||
txid: spends[index][0],
|
||||
vin: spends[index][1],
|
||||
}
|
||||
} else {
|
||||
output.spend = null
|
||||
}
|
||||
})
|
||||
return tx
|
||||
}
|
||||
export {addSpends as addSpends}
|
||||
|
||||
export async function searchTx (txid, input, output) {
|
||||
try {
|
||||
const searchResult = await fetchTx(txid)
|
||||
let searchResult = await fetchTx(txid)
|
||||
const spendResult = await fetchSpends(txid)
|
||||
if (searchResult) {
|
||||
if (spendResult) searchResult = addSpends(searchResult, spendResult)
|
||||
selectedTx.set(searchResult)
|
||||
detailTx.set(searchResult)
|
||||
overlay.set('tx')
|
||||
|
@ -13,6 +13,7 @@ defmodule BitcoinStream.Bridge.Block do
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
alias BitcoinStream.BlockData, as: BlockData
|
||||
alias BitcoinStream.Index.Spend, as: SpendIndex
|
||||
|
||||
def child_spec(host: host, port: port) do
|
||||
%{
|
||||
|
@ -9,6 +9,7 @@ defmodule BitcoinStream.Bridge.Sequence do
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Index.Spend, as: SpendIndex
|
||||
alias BitcoinStream.Mempool, as: Mempool
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
|
||||
@ -92,13 +93,14 @@ defmodule BitcoinStream.Bridge.Sequence do
|
||||
|
||||
defp loop(socket) do
|
||||
with {:ok, message} <- :chumak.recv_multipart(socket),
|
||||
[_topic, <<hash::binary-size(32), type::binary-size(1), seq::little-size(64)>>, <<_sequence::little-size(32)>>] <- message,
|
||||
txid <- Base.encode16(hash, case: :lower),
|
||||
[_topic, <<id::binary-size(32), type::binary-size(1), rest::binary>>, <<_sequence::little-size(32)>>] <- message,
|
||||
hash <- Base.encode16(id, case: :lower),
|
||||
event <- to_charlist(type) do
|
||||
case event do
|
||||
# Transaction added to mempool
|
||||
'A' ->
|
||||
case Mempool.register(:mempool, txid, seq, true) do
|
||||
<<seq::little-size(64)>> = rest;
|
||||
case Mempool.register(:mempool, hash, seq, true) do
|
||||
false -> false
|
||||
|
||||
{ txn, count } ->
|
||||
@ -107,16 +109,26 @@ defmodule BitcoinStream.Bridge.Sequence do
|
||||
|
||||
# Transaction removed from mempool for non block inclusion reason
|
||||
'R' ->
|
||||
case Mempool.drop(:mempool, txid) do
|
||||
<<seq::little-size(64)>> = rest;
|
||||
case Mempool.drop(:mempool, hash) do
|
||||
count when is_integer(count) ->
|
||||
send_drop_tx(txid, count);
|
||||
send_drop_tx(hash, count);
|
||||
|
||||
_ ->
|
||||
true
|
||||
end
|
||||
|
||||
'D' ->
|
||||
SpendIndex.notify_block_disconnect(:spends, hash);
|
||||
true
|
||||
|
||||
'C' ->
|
||||
SpendIndex.notify_block(:spends, hash);
|
||||
true
|
||||
|
||||
# Don't care about other events
|
||||
_ -> true
|
||||
other ->
|
||||
true
|
||||
end
|
||||
else
|
||||
_ -> false
|
||||
|
@ -59,6 +59,36 @@ def decode(block_binary) do
|
||||
end
|
||||
end
|
||||
|
||||
def parse(hex) do
|
||||
with block_binary <- Base.decode16!(hex, [case: :lower]),
|
||||
bytes <- byte_size(block_binary),
|
||||
{:ok, raw_block} <- Bitcoinex.Block.decode(hex),
|
||||
id <- Bitcoinex.Block.block_id(block_binary),
|
||||
{summarised_txns, total_value, total_fees} <- summarise_txns(raw_block.txns)
|
||||
do
|
||||
{:ok, %__MODULE__{
|
||||
version: raw_block.version,
|
||||
prev_block: raw_block.prev_block,
|
||||
merkle_root: raw_block.merkle_root,
|
||||
timestamp: raw_block.timestamp,
|
||||
bits: raw_block.bits,
|
||||
bytes: bytes,
|
||||
txn_count: raw_block.txn_count,
|
||||
txns: summarised_txns,
|
||||
fees: total_fees,
|
||||
value: total_value,
|
||||
id: id
|
||||
}}
|
||||
else
|
||||
{:error, reason} ->
|
||||
Logger.error("Error decoding data for BitcoinBlock: #{reason}")
|
||||
:error
|
||||
_ ->
|
||||
Logger.error("Error decoding data for BitcoinBlock: (unknown reason)")
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
defp summarise_txns([coinbase | txns]) do
|
||||
# Mempool.is_done returns false while the mempool is still syncing
|
||||
with extended_coinbase <- BitcoinTx.extend(coinbase),
|
||||
@ -87,7 +117,7 @@ defp summarise_txns([next | rest], summarised, total, fees, do_inflate) do
|
||||
if do_inflate do
|
||||
inflated_txn = BitcoinTx.inflate(extended_txn, false)
|
||||
if (inflated_txn.inflated) do
|
||||
Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{extended_txn.id}");
|
||||
# Logger.debug("Processing block tx #{length(summarised)}/#{length(summarised) + length(rest) + 1} | #{extended_txn.id}");
|
||||
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, fees + inflated_txn.fee, true)
|
||||
else
|
||||
summarise_txns(rest, [inflated_txn | summarised], total + inflated_txn.value, nil, false)
|
||||
|
@ -58,10 +58,10 @@ defmodule BitcoinStream.Router do
|
||||
end
|
||||
end
|
||||
|
||||
match "/api/spends/:txid/:from/:to" do
|
||||
case get_tx_spends(txid, from, to) do
|
||||
match "/api/spends/:txid" do
|
||||
case get_tx_spends(txid) do
|
||||
{:ok, spends} ->
|
||||
put_resp_header(conn, "cache-control", "public, max-age=60, immutable")
|
||||
put_resp_header(conn, "cache-control", "public, max-age=10, immutable")
|
||||
|> send_resp(200, spends)
|
||||
_ ->
|
||||
Logger.debug("Error getting tx spends");
|
||||
@ -119,11 +119,8 @@ defmodule BitcoinStream.Router do
|
||||
end
|
||||
end
|
||||
|
||||
defp get_tx_spends(txid, from_str, to_str) do
|
||||
with {fromIndex, _} <- Integer.parse(from_str),
|
||||
{toIndex, _} <- Integer.parse(to_str),
|
||||
true <- ((toIndex - fromIndex) < 100),
|
||||
{:ok, spends} <- SpendIndex.get_tx_spends(:spends, {txid, fromIndex, toIndex}),
|
||||
defp get_tx_spends(txid) do
|
||||
with {:ok, spends} <- SpendIndex.get_tx_spends(:spends, txid),
|
||||
{:ok, payload} <- Jason.encode(spends) do
|
||||
{:ok, payload}
|
||||
else
|
||||
|
@ -4,6 +4,7 @@ defmodule BitcoinStream.Index.Spend do
|
||||
|
||||
use GenServer
|
||||
|
||||
alias BitcoinStream.Protocol.Block, as: BitcoinBlock
|
||||
alias BitcoinStream.RPC, as: RPC
|
||||
|
||||
def start_link(opts) do
|
||||
@ -17,25 +18,32 @@ defmodule BitcoinStream.Index.Spend do
|
||||
if (indexed != nil) do
|
||||
{:ok, dbref} = :rocksdb.open(String.to_charlist("data/index/spend"), [create_if_missing: true]);
|
||||
Process.send_after(self(), :sync, 2000);
|
||||
{:ok, [dbref, indexed]}
|
||||
{:ok, [dbref, indexed, false]}
|
||||
else
|
||||
{:ok, nil}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def terminate(_reason, [dbref, indexed]) do
|
||||
def terminate(_reason, [dbref, indexed, _done]) do
|
||||
if (indexed != nil) do
|
||||
:rocksdb.close(dbref)
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:sync, [dbref, indexed]) do
|
||||
def handle_info(:sync, [dbref, indexed, done]) do
|
||||
if (indexed != nil) do
|
||||
sync(dbref);
|
||||
case sync(dbref) do
|
||||
true ->
|
||||
{:noreply, [dbref, indexed, true]}
|
||||
|
||||
_ ->
|
||||
{:noreply, [dbref, indexed, false]}
|
||||
end
|
||||
else
|
||||
{:noreply, [dbref, indexed, done]}
|
||||
end
|
||||
{:noreply, [dbref, indexed]}
|
||||
end
|
||||
|
||||
@impl true
|
||||
@ -46,20 +54,43 @@ defmodule BitcoinStream.Index.Spend do
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:get_tx_spends, {txid, from, to}}, _from, [dbref, indexed]) do
|
||||
case get_transaction_spends(dbref, txid, from, to) do
|
||||
def handle_call({:get_tx_spends, txid}, _from, [dbref, indexed, done]) do
|
||||
case get_transaction_spends(dbref, txid) do
|
||||
{:ok, spends} ->
|
||||
{:reply, {:ok, spends}, [dbref, indexed]}
|
||||
{:reply, {:ok, spends}, [dbref, indexed, done]}
|
||||
|
||||
err ->
|
||||
Logger.error("failed to fetch tx spends");
|
||||
{:reply, err, [dbref, indexed]}
|
||||
{:reply, err, [dbref, indexed, done]}
|
||||
end
|
||||
end
|
||||
|
||||
def get_tx_spends(pid, {txid, from, to}) do
|
||||
Logger.info("GETTING TX OUTSPENDS FOR #{pid} #{txid} #{from} #{to}");
|
||||
GenServer.call(pid, {:get_tx_spends, {txid, from, to}}, 60000)
|
||||
@impl true
|
||||
def handle_cast(:new_block, [dbref, indexed, done]) do
|
||||
if (indexed != nil and done) do
|
||||
case sync(dbref) do
|
||||
true ->
|
||||
{:noreply, [dbref, indexed, true]}
|
||||
|
||||
_ ->
|
||||
{:noreply, [dbref, indexed, false]}
|
||||
end
|
||||
else
|
||||
Logger.info("Already building spend index");
|
||||
{:noreply, [dbref, indexed, false]}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:block_disconnected, hash}, [dbref, indexed, done]) do
|
||||
if (indexed != nil and done) do
|
||||
block_disconnected(dbref, hash)
|
||||
end
|
||||
{:noreply, [dbref, indexed, done]}
|
||||
end
|
||||
|
||||
def get_tx_spends(pid, txid) do
|
||||
GenServer.call(pid, {:get_tx_spends, txid}, 60000)
|
||||
catch
|
||||
:exit, reason ->
|
||||
case reason do
|
||||
@ -71,6 +102,14 @@ defmodule BitcoinStream.Index.Spend do
|
||||
error -> {:error, error}
|
||||
end
|
||||
|
||||
def notify_block(pid, _hash) do
|
||||
GenServer.cast(pid, :new_block)
|
||||
end
|
||||
|
||||
def notify_block_disconnect(pid, hash) do
|
||||
GenServer.cast(pid, {:block_disconnected, hash})
|
||||
end
|
||||
|
||||
defp wait_for_ibd() do
|
||||
case RPC.get_node_status(:rpc) do
|
||||
:ok -> true
|
||||
@ -89,73 +128,149 @@ defmodule BitcoinStream.Index.Spend do
|
||||
:not_found ->
|
||||
-1
|
||||
|
||||
err ->
|
||||
_ ->
|
||||
Logger.error("unexpected leveldb response")
|
||||
end
|
||||
end
|
||||
|
||||
defp get_chain_height() do
|
||||
case RPC.request(:rpc, "getblockchaininfo", []) do
|
||||
{:ok, 200, %{"blocks" => height}} ->
|
||||
case RPC.request(:rpc, "getblockcount", []) do
|
||||
{:ok, 200, height} ->
|
||||
height
|
||||
|
||||
err ->
|
||||
_ ->
|
||||
Logger.error("unexpected RPC response");
|
||||
:err
|
||||
end
|
||||
end
|
||||
|
||||
defp get_block_data(height) do
|
||||
defp get_block(height) do
|
||||
with {:ok, 200, blockhash} <- RPC.request(:rpc, "getblockhash", [height]),
|
||||
{:ok, 200, blockdata} <- RPC.request(:rpc, "getblock", [blockhash, 2]) do
|
||||
blockdata
|
||||
{:ok, 200, blockdata} <- RPC.request(:rpc, "getblock", [blockhash, 0]),
|
||||
{:ok, block} <- BitcoinBlock.parse(blockdata) do
|
||||
block
|
||||
end
|
||||
end
|
||||
|
||||
defp index_input(batch, spendkey, vin) do
|
||||
case vin do
|
||||
%{"txid" => txid, "vout" => vout} ->
|
||||
:rocksdb.batch_put(batch, "#{txid}:#{vout}", spendkey)
|
||||
|
||||
_ -> # coinbase input
|
||||
:ok
|
||||
defp get_block_by_hash(hash) do
|
||||
with {:ok, 200, blockdata} <- RPC.request(:rpc, "getblock", [hash, 0]),
|
||||
{:ok, block} <- BitcoinBlock.parse(blockdata) do
|
||||
block
|
||||
end
|
||||
end
|
||||
|
||||
defp index_inputs(_batch, _txid, [], _vout) do
|
||||
:ok
|
||||
end
|
||||
defp index_inputs(batch, txid, [vin | rest], vout) do
|
||||
case index_input(batch, "#{txid}:#{vout}", vin) do
|
||||
:ok -> index_inputs(batch, txid, rest, vout+1)
|
||||
_ -> :err
|
||||
defp index_input(spendkey, input, spends) do
|
||||
case input do
|
||||
# coinbase (skip)
|
||||
%{prev_txid: "0000000000000000000000000000000000000000000000000000000000000000"} ->
|
||||
spends
|
||||
|
||||
%{prev_txid: txid, prev_vout: vout} ->
|
||||
binid = Base.decode16!(txid, [case: :lower])
|
||||
case spends[binid] do
|
||||
nil ->
|
||||
Map.put(spends, binid, [[vout, spendkey]])
|
||||
|
||||
a ->
|
||||
Map.put(spends, binid, [[vout, spendkey] | a])
|
||||
end
|
||||
|
||||
# unexpected input format (should never happen)
|
||||
_ ->
|
||||
spends
|
||||
end
|
||||
end
|
||||
|
||||
defp index_tx(batch, %{"txid" => txid, "vin" => inputs}) do
|
||||
index_inputs(batch, txid, inputs, 0)
|
||||
defp index_inputs(_binid, [], _vout, spends) do
|
||||
spends
|
||||
end
|
||||
defp index_inputs(binid, [vin | rest], vout, spends) do
|
||||
spends = index_input(binid <> <<vout::integer-size(24)>>, vin, spends);
|
||||
index_inputs(binid, rest, vout+1, spends)
|
||||
end
|
||||
|
||||
defp index_txs(_batch, []) do
|
||||
:ok
|
||||
end
|
||||
defp index_txs(batch, [tx | rest]) do
|
||||
case index_tx(batch, tx) do
|
||||
:ok -> index_txs(batch, rest)
|
||||
|
||||
_ -> :err
|
||||
end
|
||||
defp index_tx(%{id: txid, inputs: inputs}, spends) do
|
||||
binid = Base.decode16!(txid, [case: :lower]);
|
||||
index_inputs(binid, inputs, 0, spends)
|
||||
end
|
||||
|
||||
defp index_block(batch, height) do
|
||||
with %{"tx" => txs} <- get_block_data(height),
|
||||
:ok <- index_txs(batch, txs) do
|
||||
defp index_txs([], spends) do
|
||||
spends
|
||||
end
|
||||
defp index_txs([tx | rest], spends) do
|
||||
spends = index_tx(tx, spends);
|
||||
index_txs(rest, spends)
|
||||
end
|
||||
|
||||
defp index_block_inputs(dbref, batch, txns) do
|
||||
spends = index_txs(txns, %{});
|
||||
Enum.each(spends, fn {binid, outputs} ->
|
||||
case get_spends(dbref, binid) do
|
||||
false ->
|
||||
Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}")
|
||||
:ok
|
||||
|
||||
prev ->
|
||||
:rocksdb.batch_put(batch, binid, fillBinarySpends(prev, outputs))
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp init_block_txs(batch, txns) do
|
||||
Enum.each(txns, fn tx ->
|
||||
size = length(tx.outputs) * 35 * 8;
|
||||
binary_txid = Base.decode16!(tx.id, [case: :lower]);
|
||||
:rocksdb.batch_put(batch, binary_txid, <<0::integer-size(size)>>)
|
||||
end)
|
||||
end
|
||||
|
||||
defp index_block(dbref, height) do
|
||||
with block <- get_block(height),
|
||||
{:ok, batch} <- :rocksdb.batch(),
|
||||
:ok <- init_block_txs(batch, block.txns),
|
||||
:ok <- :rocksdb.write_batch(dbref, batch, []),
|
||||
{:ok, batch} <- :rocksdb.batch(),
|
||||
:ok <- index_block_inputs(dbref, batch, block.txns),
|
||||
:ok <- :rocksdb.write_batch(dbref, batch, []) do
|
||||
:ok
|
||||
else
|
||||
_ -> :err
|
||||
err ->
|
||||
Logger.error("error indexing block");
|
||||
IO.inspect(err);
|
||||
:err
|
||||
end
|
||||
end
|
||||
|
||||
# insert a 35-byte spend key into a binary spend index
|
||||
# (not sure how efficient this method is?)
|
||||
defp fillBinarySpend(bin, index, spendkey) do
|
||||
a_size = 35 * index;
|
||||
<<a::binary-size(a_size), _b::binary-size(35), c::binary>> = bin;
|
||||
<<a::binary, spendkey::binary, c::binary>>
|
||||
end
|
||||
defp fillBinarySpends(bin, []) do
|
||||
bin
|
||||
end
|
||||
defp fillBinarySpends(bin, [[index, spendkey] | rest]) do
|
||||
bin = fillBinarySpend(bin, index, spendkey);
|
||||
fillBinarySpends(bin, rest)
|
||||
end
|
||||
|
||||
# "erase" a spend by zeroing out the spend key
|
||||
defp clearBinarySpend(bin, index, _spendkey) do
|
||||
a_size = 35 * index;
|
||||
b_size = 35 * 8;
|
||||
<<a::binary-size(a_size), _b::binary-size(35), c::binary>> = bin;
|
||||
<<a::binary, <<0::integer-size(b_size)>>, c::binary>>
|
||||
end
|
||||
defp clearBinarySpends(bin, []) do
|
||||
bin
|
||||
end
|
||||
defp clearBinarySpends(bin, [[index, spendkey] | rest]) do
|
||||
bin = clearBinarySpend(bin, index, spendkey);
|
||||
clearBinarySpends(bin, rest)
|
||||
end
|
||||
|
||||
# On start up, check index height (load from leveldb) vs latest block height (load via rpc)
|
||||
# Until index height = block height, process next block
|
||||
defp sync(dbref) do
|
||||
@ -163,47 +278,118 @@ defmodule BitcoinStream.Index.Spend do
|
||||
with index_height <- get_index_height(dbref),
|
||||
chain_height <- get_chain_height() do
|
||||
if index_height < chain_height do
|
||||
# Logger.info("Building spend index for block #{index_height + 1}");
|
||||
with {:ok, batch} <- :rocksdb.batch(),
|
||||
:ok <- index_block(batch, index_height + 1),
|
||||
:ok <- :rocksdb.write_batch(dbref, batch, []),
|
||||
with :ok <- index_block(dbref, index_height + 1),
|
||||
:ok <- :rocksdb.put(dbref, "height", <<(index_height + 1)::integer-size(32)>>, []) do
|
||||
Logger.info("Built spend index for block #{index_height + 1}");
|
||||
Process.send_after(self(), :sync, 0);
|
||||
else
|
||||
err ->
|
||||
_ ->
|
||||
Logger.error("Failed to build spend index");
|
||||
:err
|
||||
false
|
||||
end
|
||||
else
|
||||
Logger.info("Spend index fully synced")
|
||||
:ok
|
||||
Logger.info("Spend index fully synced to height #{index_height}");
|
||||
true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp get_spend(dbref, txid, vout) do
|
||||
case :rocksdb.get(dbref, "#{txid}:#{vout}", []) do
|
||||
{:ok, spend} ->
|
||||
spend
|
||||
defp get_spends(dbref, binary_txid) do
|
||||
case :rocksdb.get(dbref, binary_txid, []) do
|
||||
{:ok, spends} ->
|
||||
spends
|
||||
|
||||
:not_found ->
|
||||
false
|
||||
|
||||
err ->
|
||||
Logger.error("unexpected leveldb response")
|
||||
_ ->
|
||||
Logger.error("unexpected leveldb response");
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
defp get_transaction_spends(dbref, txid, limit, limit, spends) do
|
||||
spends
|
||||
defp unpack_spends(<<>>, spend_array) do
|
||||
Enum.reverse(spend_array)
|
||||
end
|
||||
# unspent outputs are zeroed out
|
||||
defp unpack_spends(<<0::integer-size(280), rest::binary>>, spend_array) do
|
||||
unpack_spends(rest, [false | spend_array])
|
||||
end
|
||||
defp unpack_spends(<<binary_txid::binary-size(32), index::integer-size(24), rest::binary>>, spend_array) do
|
||||
txid = Base.encode16(binary_txid, [case: :lower]);
|
||||
unpack_spends(rest, [[txid, index] | spend_array])
|
||||
end
|
||||
defp unpack_spends(bin) do
|
||||
unpack_spends(bin, [])
|
||||
end
|
||||
|
||||
defp get_transaction_spends(dbref, txid, index, limit, spends) do
|
||||
[get_spend(dbref, txid, index) | get_transaction_spends(dbref, txid, index + 1, limit, spends)]
|
||||
defp get_transaction_spends(dbref, txid) do
|
||||
binary_txid = Base.decode16!(txid, [case: :lower]);
|
||||
case get_spends(dbref, binary_txid) do
|
||||
false ->
|
||||
{:ok, nil}
|
||||
|
||||
spends ->
|
||||
spend_array = unpack_spends(spends);
|
||||
{:ok, spend_array}
|
||||
end
|
||||
end
|
||||
|
||||
defp get_transaction_spends(dbref, txid, index, limit) do
|
||||
{:ok, get_transaction_spends(dbref, txid, index, limit, [])}
|
||||
defp stack_dropped_blocks(dbref, hash, undo_stack, min_height) do
|
||||
# while we're below the latest processed height
|
||||
# keep adding blocks to the undo stack until we find an ancestor in the main chain
|
||||
with {:ok, 200, blockdata} <- RPC.request(:rpc, "getblock", [hash, 1]),
|
||||
index_height <- get_index_height(dbref),
|
||||
true <- (blockdata["height"] <= index_height),
|
||||
true <- (blockdata["confirmations"] < 0) do
|
||||
stack_dropped_blocks(dbref, blockdata["previousblockhash"], [hash | undo_stack], blockdata["height"])
|
||||
else
|
||||
_ -> [undo_stack, min_height]
|
||||
end
|
||||
end
|
||||
|
||||
defp drop_block_inputs(dbref, batch, txns) do
|
||||
spends = index_txs(txns, %{});
|
||||
Enum.each(spends, fn {binid, outputs} ->
|
||||
case get_spends(dbref, binid) do
|
||||
false ->
|
||||
Logger.error("uninitialised tx in input index: #{Base.encode16(binid, [case: :lower])}")
|
||||
:ok
|
||||
|
||||
prev ->
|
||||
:rocksdb.batch_put(batch, binid, clearBinarySpends(prev, outputs))
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp drop_block(dbref, hash) do
|
||||
with block <- get_block_by_hash(hash),
|
||||
{:ok, batch} <- :rocksdb.batch(),
|
||||
:ok <- drop_block_inputs(dbref, batch, block.txns),
|
||||
:ok <- :rocksdb.write_batch(dbref, batch, []) do
|
||||
:ok
|
||||
else
|
||||
err ->
|
||||
Logger.error("error indexing block");
|
||||
IO.inspect(err);
|
||||
:err
|
||||
end
|
||||
end
|
||||
|
||||
defp drop_blocks(_dbref, []) do
|
||||
:ok
|
||||
end
|
||||
defp drop_blocks(dbref, [hash | rest]) do
|
||||
drop_block(dbref, hash);
|
||||
drop_blocks(dbref, rest)
|
||||
end
|
||||
defp block_disconnected(dbref, hash) do
|
||||
[undo_stack, min_height] = stack_dropped_blocks(dbref, hash, [], nil);
|
||||
drop_blocks(dbref, undo_stack);
|
||||
if (min_height != nil) do
|
||||
:rocksdb.put(dbref, "height", <<(min_height - 1)::integer-size(32)>>, [])
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user