diff --git a/test/functional/wallet_keypool.py b/test/functional/wallet_keypool.py index 6ed8572347..d50f40a85c 100755 --- a/test/functional/wallet_keypool.py +++ b/test/functional/wallet_keypool.py @@ -4,33 +4,113 @@ # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the wallet keypool and interaction with wallet encryption/locking.""" +import re import time from decimal import Decimal +from test_framework.descriptors import descsum_create from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_raises_rpc_error from test_framework.wallet_util import WalletUnlock +TEST_KEYPOOL_SIZE = 10 +TEST_NEW_KEYPOOL_SIZE = TEST_KEYPOOL_SIZE + 2 + class KeyPoolTest(BitcoinTestFramework): def add_options(self, parser): self.add_wallet_options(parser) def set_test_params(self): self.num_nodes = 1 + self.extra_args = [[f"-keypool={TEST_KEYPOOL_SIZE}"]] def skip_test_if_missing_module(self): self.skip_if_no_wallet() def run_test(self): nodes = self.nodes + + # Derive addresses from the wallet without removing them from keypool + addrs = [] + if not self.options.descriptors: + path = str(self.nodes[0].datadir_path / 'wallet.dump') + nodes[0].dumpwallet(path) + file = open(path, "r", encoding="utf8") + m = re.search(r"masterkey: (\w+)", file.read()) + file.close() + xpriv = m.group(1) + desc = descsum_create(f"wpkh({xpriv}/0h/0h/*h)") + addrs = nodes[0].deriveaddresses(descriptor=desc, range=[0, 9]) + else: + list_descriptors = nodes[0].listdescriptors() + for desc in list_descriptors["descriptors"]: + if desc['active'] and not desc["internal"] and desc["desc"][:4] == "wpkh": + addrs = nodes[0].deriveaddresses(descriptor=desc["desc"], range=[0, 9]) + + addr0 = addrs[0] + addr9 = addrs[9] # arbitrary future address index + + # Address is mine and active before it is removed from keypool by getnewaddress + addr0_before_getting_data = nodes[0].getaddressinfo(addr0) + assert addr0_before_getting_data['ismine'] + assert addr0_before_getting_data['isactive'] + addr_before_encrypting = nodes[0].getnewaddress() addr_before_encrypting_data = nodes[0].getaddressinfo(addr_before_encrypting) + assert addr0 == addr_before_encrypting + # Address is still mine and active even after being removed from keypool + assert addr_before_encrypting_data['ismine'] + assert addr_before_encrypting_data['isactive'] + wallet_info_old = nodes[0].getwalletinfo() if not self.options.descriptors: assert addr_before_encrypting_data['hdseedid'] == wallet_info_old['hdseedid'] + # Address is mine and active before wallet is encrypted (resetting keypool) + addr9_before_encrypting_data = nodes[0].getaddressinfo(addr9) + assert addr9_before_encrypting_data['ismine'] + assert addr9_before_encrypting_data['isactive'] + + # Imported things are never considered active, no need to rescan + # Imported public keys / addresses can't be mine because they are not spendable + if self.options.descriptors: + nodes[0].importdescriptors([{ + "desc": "addr(bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v)", + "timestamp": "now" + }]) + else: + nodes[0].importaddress("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v", "label", rescan=False) + import_addr_data = nodes[0].getaddressinfo("bcrt1q95gp4zeaah3qcerh35yhw02qeptlzasdtst55v") + assert import_addr_data["iswatchonly"] is not self.options.descriptors + assert not import_addr_data["ismine"] + assert not import_addr_data["isactive"] + + if self.options.descriptors: + nodes[0].importdescriptors([{ + "desc": "pk(02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e)", + "timestamp": "now" + }]) + else: + nodes[0].importpubkey("02f893ca95b0d55b4ce4e72ae94982eb679158cb2ebc120ff62c17fedfd1f0700e", "label", rescan=False) + import_pub_data = nodes[0].getaddressinfo("bcrt1q4v7a8wn5vqd6fk4026s5gzzxyu7cfzz23n576h") + assert import_pub_data["iswatchonly"] is not self.options.descriptors + assert not import_pub_data["ismine"] + assert not import_pub_data["isactive"] + + nodes[0].importprivkey("cPMX7v5CNV1zCphFSq2hnR5rCjzAhA1GsBfD1qrJGdj4QEfu38Qx", "label", rescan=False) + import_priv_data = nodes[0].getaddressinfo("bcrt1qa985v5d53qqtrfujmzq2zrw3r40j6zz4ns02kj") + assert not import_priv_data["iswatchonly"] + assert import_priv_data["ismine"] + assert not import_priv_data["isactive"] + # Encrypt wallet and wait to terminate nodes[0].encryptwallet('test') + addr9_after_encrypting_data = nodes[0].getaddressinfo(addr9) + # Key is from unencrypted seed, no longer considered active + assert not addr9_after_encrypting_data['isactive'] + # ...however it *IS* still mine since we can spend with this key + assert addr9_after_encrypting_data['ismine'] + if self.options.descriptors: # Import hardened derivation only descriptors nodes[0].walletpassphrase('test', 10) @@ -76,7 +156,9 @@ class KeyPoolTest(BitcoinTestFramework): } ]) nodes[0].walletlock() - # Keep creating keys + # Keep creating keys until we run out + for _ in range(TEST_KEYPOOL_SIZE - 1): + nodes[0].getnewaddress() addr = nodes[0].getnewaddress() addr_data = nodes[0].getaddressinfo(addr) wallet_info = nodes[0].getwalletinfo() @@ -85,24 +167,23 @@ class KeyPoolTest(BitcoinTestFramework): assert addr_data['hdseedid'] == wallet_info['hdseedid'] assert_raises_rpc_error(-12, "Error: Keypool ran out, please call keypoolrefill first", nodes[0].getnewaddress) - # put six (plus 2) new keys in the keypool (100% external-, +100% internal-keys, 1 in min) + # put two new keys in the keypool with WalletUnlock(nodes[0], 'test'): - nodes[0].keypoolrefill(6) + nodes[0].keypoolrefill(TEST_NEW_KEYPOOL_SIZE) wi = nodes[0].getwalletinfo() if self.options.descriptors: - assert_equal(wi['keypoolsize_hd_internal'], 24) - assert_equal(wi['keypoolsize'], 24) + # Descriptors wallet: keypool size applies to both internal and external + # chains and there are four of each (legacy, nested, segwit, and taproot) + assert_equal(wi['keypoolsize_hd_internal'], TEST_NEW_KEYPOOL_SIZE * 4) + assert_equal(wi['keypoolsize'], TEST_NEW_KEYPOOL_SIZE * 4) else: - assert_equal(wi['keypoolsize_hd_internal'], 6) - assert_equal(wi['keypoolsize'], 6) + # Legacy wallet: keypool size applies to both internal and external HD chains + assert_equal(wi['keypoolsize_hd_internal'], TEST_NEW_KEYPOOL_SIZE) + assert_equal(wi['keypoolsize'], TEST_NEW_KEYPOOL_SIZE) # drain the internal keys - nodes[0].getrawchangeaddress() - nodes[0].getrawchangeaddress() - nodes[0].getrawchangeaddress() - nodes[0].getrawchangeaddress() - nodes[0].getrawchangeaddress() - nodes[0].getrawchangeaddress() + for _ in range(TEST_NEW_KEYPOOL_SIZE): + nodes[0].getrawchangeaddress() # remember keypool sizes wi = nodes[0].getwalletinfo() kp_size_before = [wi['keypoolsize_hd_internal'], wi['keypoolsize']] @@ -115,13 +196,8 @@ class KeyPoolTest(BitcoinTestFramework): # drain the external keys addr = set() - addr.add(nodes[0].getnewaddress(address_type="bech32")) - addr.add(nodes[0].getnewaddress(address_type="bech32")) - addr.add(nodes[0].getnewaddress(address_type="bech32")) - addr.add(nodes[0].getnewaddress(address_type="bech32")) - addr.add(nodes[0].getnewaddress(address_type="bech32")) - addr.add(nodes[0].getnewaddress(address_type="bech32")) - assert len(addr) == 6 + for _ in range(TEST_NEW_KEYPOOL_SIZE): + addr.add(nodes[0].getnewaddress(address_type="bech32")) # remember keypool sizes wi = nodes[0].getwalletinfo() kp_size_before = [wi['keypoolsize_hd_internal'], wi['keypoolsize']] @@ -132,16 +208,18 @@ class KeyPoolTest(BitcoinTestFramework): kp_size_after = [wi['keypoolsize_hd_internal'], wi['keypoolsize']] assert_equal(kp_size_before, kp_size_after) - # refill keypool with three new addresses + # refill keypool nodes[0].walletpassphrase('test', 1) - nodes[0].keypoolrefill(3) + # At this point the keypool has >45 keys in it + # calling keypoolrefill with anything smaller than that is a noop + nodes[0].keypoolrefill(50) # test walletpassphrase timeout time.sleep(1.1) assert_equal(nodes[0].getwalletinfo()["unlocked_until"], 0) # drain the keypool - for _ in range(3): + for _ in range(50): nodes[0].getnewaddress() assert_raises_rpc_error(-12, "Keypool ran out", nodes[0].getnewaddress)