bitcoin/test/functional/rpc_getblocklocations.py
Roman Zeyde 2ee1991c7d RPC: Add getblocklocations call
This RPC allows the client to retrieve the file system locations
of the confirmed blocks and their undo data, to allow building
efficient indexes outside of Bitcoin Core.

An example usage is described here:
https://github.com/romanz/electrs/issues/308

By using the new RPC, it is possible to build an address-based
index taking ~24GB and a txindex taking ~6GB (as of Dec. 2020).
2024-03-04 17:39:06 +00:00

79 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2019-2020 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the getblocklocations rpc call."""
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (assert_equal, assert_raises_rpc_error)
from test_framework.messages import ser_vector
class GetblocklocationsTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def run_test(self):
"""Test a trivial usage of the getblocklocations RPC command."""
node = self.nodes[0]
test_block_count = 7
self.generate(node, test_block_count)
NULL_HASH = '0000000000000000000000000000000000000000000000000000000000000000'
block_hashes = [node.getblockhash(height) for height in range(test_block_count)]
block_hashes.reverse()
block_locations = {}
def check_consistency(tip, a):
for o in a:
if tip in block_locations:
assert_equal(block_locations[tip], o)
else:
block_locations[tip] = o
tip = o['prev']
# Get blocks' locations using several batch sizes
last_locations = None
for batch_size in range(1, 10):
locations = []
tip = block_hashes[0]
while tip != NULL_HASH:
locations.extend(node.getblocklocations(tip, batch_size))
check_consistency(block_hashes[0], locations)
tip = locations[-1]['prev']
if last_locations: assert_equal(last_locations, locations)
last_locations = locations
# Read blocks' data from the file system
blocks_dir = node.chain_path / 'blocks'
with (blocks_dir / 'blk00000.dat').open('rb') as blkfile:
for block_hash in block_hashes:
location = block_locations[block_hash]
block_bytes = bytes.fromhex(node.getblock(block_hash, 0))
assert_file_contains(blkfile, location['data'], block_bytes)
empty_undo = ser_vector([]) # empty blocks = no transactions to undo
with (blocks_dir / 'rev00000.dat').open('rb') as revfile:
for block_hash in block_hashes[:-1]: # skip genesis block (has no undo)
location = block_locations[block_hash]
assert_file_contains(revfile, location['undo'], empty_undo)
# Fail getting unknown block
unknown_block_hash = '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef'
assert_raises_rpc_error(-5, 'Block not found', node.getblocklocations, unknown_block_hash, 3)
# Fail in pruned mode
self.restart_node(0, ['-prune=1'])
tip = block_hashes[0]
assert_raises_rpc_error(-1, 'Block locations are not available in prune mode', node.getblocklocations, tip, 3)
def assert_file_contains(fileobj, offset, data):
fileobj.seek(offset)
assert_equal(fileobj.read(len(data)), data)
if __name__ == '__main__':
GetblocklocationsTest().main()