bitcoin/test/functional/wallet_keypool.py
Matthew Zipkin 67e6fd03de test: cover "ismine" and "isactive" field in rpc getaddressinfo
Github-Pull: #27216
Rebased-From: 67650211e36003ca3adddef34118ea45023c283f
2024-11-13 17:25:29 +00:00

305 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2014-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the wallet keypool and interaction with wallet encryption/locking."""
import re
import time
from decimal import Decimal
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
from test_framework.wallet_util import WalletUnlock
TEST_KEYPOOL_SIZE = 10
TEST_NEW_KEYPOOL_SIZE = TEST_KEYPOOL_SIZE + 2
class KeyPoolTest(BitcoinTestFramework):
def add_options(self, parser):
self.add_wallet_options(parser)
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [[f"-keypool={TEST_KEYPOOL_SIZE}"]]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
nodes = self.nodes
# Derive addresses from the wallet without removing them from keypool
addrs = []
if not self.options.descriptors:
path = str(self.nodes[0].datadir_path / 'wallet.dump')
nodes[0].dumpwallet(path)
file = open(path, "r", encoding="utf8")
m = re.search(r"masterkey: (\w+)", file.read())
file.close()
xpriv = m.group(1)
desc = descsum_create(f"wpkh({xpriv}/0h/0h/*h)")
addrs = nodes[0].deriveaddresses(descriptor=desc, range=[0, 9])
else:
list_descriptors = nodes[0].listdescriptors()
for desc in list_descriptors["descriptors"]:
if desc['active'] and not desc["internal"] and desc["desc"][:4] == "wpkh":
addrs = nodes[0].deriveaddresses(descriptor=desc["desc"], range=[0, 9])
addr0 = addrs[0]
addr9 = addrs[9] # arbitrary future address index
# Address is mine and active before it is removed from keypool by getnewaddress
addr0_before_getting_data = nodes[0].getaddressinfo(addr0)
assert addr0_before_getting_data['ismine']
assert addr0_before_getting_data['isactive']
addr_before_encrypting = nodes[0].getnewaddress()
addr_before_encrypting_data = nodes[0].getaddressinfo(addr_before_encrypting)
assert addr0 == addr_before_encrypting
# Address is still mine and active even after being removed from keypool
assert addr_before_encrypting_data['ismine']
assert addr_before_encrypting_data['isactive']
wallet_info_old = nodes[0].getwalletinfo()
if not self.options.descriptors:
assert addr_before_encrypting_data['hdseedid'] == wallet_info_old['hdseedid']
# Address is mine and active before wallet is encrypted (resetting keypool)
addr9_before_encrypting_data = nodes[0].getaddressinfo(addr9)
assert addr9_before_encrypting_data['ismine']
assert addr9_before_encrypting_data['isactive']
# Imported things are never considered active, no need to rescan
# Imported public keys / addresses can't be mine because they are not spendable
if self.options.descriptors:
nodes[0].importdescriptors([{
"desc": "addr(bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v)",
"timestamp": "now"
}])
else:
nodes[0].importaddress("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v", "label", rescan=False)
import_addr_data = nodes[0].getaddressinfo("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v")
assert import_addr_data["iswatchonly"] is not self.options.descriptors
assert not import_addr_data["ismine"]
assert not import_addr_data["isactive"]
if self.options.descriptors:
nodes[0].importdescriptors([{
"desc": "pk(02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e)",
"timestamp": "now"
}])
else:
nodes[0].importpubkey("02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e", "label", rescan=False)
import_pub_data = nodes[0].getaddressinfo("bcrt1q4v7a8wn5vqd6fk4026s5gzzxyu7cfzz23n576h")
assert import_pub_data["iswatchonly"] is not self.options.descriptors
assert not import_pub_data["ismine"]
assert not import_pub_data["isactive"]
nodes[0].importprivkey("cPMX7v5CNV1zCphFSq2hnR5rCjzAhA1GsBfD1qrJGdj4QEfu38Qx", "label", rescan=False)
import_priv_data = nodes[0].getaddressinfo("bcrt1qa985v5d53qqtrfujmzq2zrw3r40j6zz4ns02kj")
assert not import_priv_data["iswatchonly"]
assert import_priv_data["ismine"]
assert not import_priv_data["isactive"]
# Encrypt wallet and wait to terminate
nodes[0].encryptwallet('test')
addr9_after_encrypting_data = nodes[0].getaddressinfo(addr9)
# Key is from unencrypted seed, no longer considered active
assert not addr9_after_encrypting_data['isactive']
# ...however it *IS* still mine since we can spend with this key
assert addr9_after_encrypting_data['ismine']
if self.options.descriptors:
# Import hardened derivation only descriptors
nodes[0].walletpassphrase('test', 10)
nodes[0].importdescriptors([
{
"desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
"desc": "pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/1h/*h)#a0nyvl0k",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
"desc": "sh(wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/2h/*h))#lmeu2axg",
"timestamp": "now",
"range": [0,0],
"active": True
},
{
"desc": "wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/3h/*h)#jkl636gm",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
},
{
"desc": "pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/4h/*h)#l3crwaus",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
},
{
"desc": "sh(wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/5h/*h))#qg8wa75f",
"timestamp": "now",
"range": [0,0],
"active": True,
"internal": True
}
])
nodes[0].walletlock()
# Keep creating keys until we run out
for _ in range(TEST_KEYPOOL_SIZE - 1):
nodes[0].getnewaddress()
addr = nodes[0].getnewaddress()
addr_data = nodes[0].getaddressinfo(addr)
wallet_info = nodes[0].getwalletinfo()
assert addr_before_encrypting_data['hdmasterfingerprint'] != addr_data['hdmasterfingerprint']
if not self.options.descriptors:
assert addr_data['hdseedid'] == wallet_info['hdseedid']
assert_raises_rpc_error(-12, "Error: Keypool ran out, please call keypoolrefill first", nodes[0].getnewaddress)
# put two new keys in the keypool
with WalletUnlock(nodes[0], 'test'):
nodes[0].keypoolrefill(TEST_NEW_KEYPOOL_SIZE)
wi = nodes[0].getwalletinfo()
if self.options.descriptors:
# Descriptors wallet: keypool size applies to both internal and external
# chains and there are four of each (legacy, nested, segwit, and taproot)
assert_equal(wi['keypoolsize_hd_internal'], TEST_NEW_KEYPOOL_SIZE * 4)
assert_equal(wi['keypoolsize'], TEST_NEW_KEYPOOL_SIZE * 4)
else:
# Legacy wallet: keypool size applies to both internal and external HD chains
assert_equal(wi['keypoolsize_hd_internal'], TEST_NEW_KEYPOOL_SIZE)
assert_equal(wi['keypoolsize'], TEST_NEW_KEYPOOL_SIZE)
# drain the internal keys
for _ in range(TEST_NEW_KEYPOOL_SIZE):
nodes[0].getrawchangeaddress()
# remember keypool sizes
wi = nodes[0].getwalletinfo()
kp_size_before = [wi['keypoolsize_hd_internal'], wi['keypoolsize']]
# the next one should fail
assert_raises_rpc_error(-12, "Keypool ran out", nodes[0].getrawchangeaddress)
# check that keypool sizes did not change
wi = nodes[0].getwalletinfo()
kp_size_after = [wi['keypoolsize_hd_internal'], wi['keypoolsize']]
assert_equal(kp_size_before, kp_size_after)
# drain the external keys
addr = set()
for _ in range(TEST_NEW_KEYPOOL_SIZE):
addr.add(nodes[0].getnewaddress(address_type="bech32"))
# remember keypool sizes
wi = nodes[0].getwalletinfo()
kp_size_before = [wi['keypoolsize_hd_internal'], wi['keypoolsize']]
# the next one should fail
assert_raises_rpc_error(-12, "Error: Keypool ran out, please call keypoolrefill first", nodes[0].getnewaddress)
# check that keypool sizes did not change
wi = nodes[0].getwalletinfo()
kp_size_after = [wi['keypoolsize_hd_internal'], wi['keypoolsize']]
assert_equal(kp_size_before, kp_size_after)
# refill keypool
nodes[0].walletpassphrase('test', 1)
# At this point the keypool has >45 keys in it
# calling keypoolrefill with anything smaller than that is a noop
nodes[0].keypoolrefill(50)
# test walletpassphrase timeout
time.sleep(1.1)
assert_equal(nodes[0].getwalletinfo()["unlocked_until"], 0)
# drain the keypool
for _ in range(50):
nodes[0].getnewaddress()
assert_raises_rpc_error(-12, "Keypool ran out", nodes[0].getnewaddress)
with WalletUnlock(nodes[0], 'test'):
nodes[0].keypoolrefill(100)
wi = nodes[0].getwalletinfo()
if self.options.descriptors:
assert_equal(wi['keypoolsize_hd_internal'], 400)
assert_equal(wi['keypoolsize'], 400)
else:
assert_equal(wi['keypoolsize_hd_internal'], 100)
assert_equal(wi['keypoolsize'], 100)
if not self.options.descriptors:
# Check that newkeypool entirely flushes the keypool
start_keypath = nodes[0].getaddressinfo(nodes[0].getnewaddress())['hdkeypath']
start_change_keypath = nodes[0].getaddressinfo(nodes[0].getrawchangeaddress())['hdkeypath']
# flush keypool and get new addresses
nodes[0].newkeypool()
end_keypath = nodes[0].getaddressinfo(nodes[0].getnewaddress())['hdkeypath']
end_change_keypath = nodes[0].getaddressinfo(nodes[0].getrawchangeaddress())['hdkeypath']
# The new keypath index should be 100 more than the old one
new_index = int(start_keypath.rsplit('/', 1)[1][:-1]) + 100
new_change_index = int(start_change_keypath.rsplit('/', 1)[1][:-1]) + 100
assert_equal(end_keypath, "m/0'/0'/" + str(new_index) + "'")
assert_equal(end_change_keypath, "m/0'/1'/" + str(new_change_index) + "'")
# create a blank wallet
nodes[0].createwallet(wallet_name='w2', blank=True, disable_private_keys=True)
w2 = nodes[0].get_wallet_rpc('w2')
# refer to initial wallet as w1
w1 = nodes[0].get_wallet_rpc(self.default_wallet_name)
# import private key and fund it
address = addr.pop()
desc = w1.getaddressinfo(address)['desc']
if self.options.descriptors:
res = w2.importdescriptors([{'desc': desc, 'timestamp': 'now'}])
else:
res = w2.importmulti([{'desc': desc, 'timestamp': 'now'}])
assert_equal(res[0]['success'], True)
with WalletUnlock(w1, 'test'):
res = w1.sendtoaddress(address=address, amount=0.00010000)
self.generate(nodes[0], 1)
destination = addr.pop()
# Using a fee rate (10 sat / byte) well above the minimum relay rate
# creating a 5,000 sat transaction with change should not be possible
assert_raises_rpc_error(-4, "Transaction needs a change address, but we can't generate it.", w2.walletcreatefundedpsbt, inputs=[], outputs=[{addr.pop(): 0.00005000}], subtractFeeFromOutputs=[0], feeRate=0.00010)
# creating a 10,000 sat transaction without change, with a manual input, should still be possible
res = w2.walletcreatefundedpsbt(inputs=w2.listunspent(), outputs=[{destination: 0.00010000}], subtractFeeFromOutputs=[0], feeRate=0.00010)
assert_equal("psbt" in res, True)
# creating a 10,000 sat transaction without change should still be possible
res = w2.walletcreatefundedpsbt(inputs=[], outputs=[{destination: 0.00010000}], subtractFeeFromOutputs=[0], feeRate=0.00010)
assert_equal("psbt" in res, True)
# should work without subtractFeeFromOutputs if the exact fee is subtracted from the amount
res = w2.walletcreatefundedpsbt(inputs=[], outputs=[{destination: 0.00008900}], feeRate=0.00010)
assert_equal("psbt" in res, True)
# dust change should be removed
res = w2.walletcreatefundedpsbt(inputs=[], outputs=[{destination: 0.00008800}], feeRate=0.00010)
assert_equal("psbt" in res, True)
# create a transaction without change at the maximum fee rate, such that the output is still spendable:
res = w2.walletcreatefundedpsbt(inputs=[], outputs=[{destination: 0.00010000}], subtractFeeFromOutputs=[0], feeRate=0.0008823)
assert_equal("psbt" in res, True)
assert_equal(res["fee"], Decimal("0.00009706"))
# creating a 10,000 sat transaction with a manual change address should be possible
res = w2.walletcreatefundedpsbt(inputs=[], outputs=[{destination: 0.00010000}], subtractFeeFromOutputs=[0], feeRate=0.00010, changeAddress=addr.pop())
assert_equal("psbt" in res, True)
if not self.options.descriptors:
msg = "Error: Private keys are disabled for this wallet"
assert_raises_rpc_error(-4, msg, w2.keypoolrefill, 100)
if __name__ == '__main__':
KeyPoolTest(__file__).main()