1#!/usr/bin/env python3 2# Copyright (c) 2016-2020 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""Test Hierarchical Deterministic wallet function.""" 6 7import os 8import shutil 9 10from test_framework.test_framework import BitcoinTestFramework 11from test_framework.util import ( 12 assert_equal, 13 assert_raises_rpc_error, 14) 15 16 17class WalletHDTest(BitcoinTestFramework): 18 def set_test_params(self): 19 self.setup_clean_chain = True 20 self.num_nodes = 2 21 self.extra_args = [[], ['-keypool=0', '-addresstype=bech32']] 22 self.supports_cli = False 23 24 def skip_test_if_missing_module(self): 25 self.skip_if_no_wallet() 26 27 def run_test(self): 28 # Make sure we use hd, keep masterkeyid 29 hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint'] 30 assert_equal(len(hd_fingerprint), 8) 31 32 # create an internal key 33 change_addr = self.nodes[1].getrawchangeaddress() 34 change_addrV = self.nodes[1].getaddressinfo(change_addr) 35 if self.options.descriptors: 36 assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/0") 37 else: 38 assert_equal(change_addrV["hdkeypath"], "m/0'/1'/0'") #first internal child key 39 40 # Import a non-HD private key in the HD wallet 41 non_hd_add = 'ncrt1qmevj8zfx0wdvp05cqwkmr6mxkfx60yezy25dj7' 42 non_hd_key = 'cS9umN9w6cDMuRVYdbkfE4c7YUFLJRoXMfhQ569uY4odiQbVN8Rt' 43 self.nodes[1].importprivkey(non_hd_key) 44 45 # This should be enough to keep the master key and the non-HD key 46 self.nodes[1].backupwallet(os.path.join(self.nodes[1].datadir, "hd.bak")) 47 #self.nodes[1].dumpwallet(os.path.join(self.nodes[1].datadir, "hd.dump")) 48 49 # Derive some HD addresses and remember the last 50 # Also send funds to each add 51 self.nodes[0].generate(101) 52 hd_add = None 53 NUM_HD_ADDS = 10 54 for i in range(1, NUM_HD_ADDS + 1): 55 hd_add = self.nodes[1].getnewaddress() 56 hd_info = self.nodes[1].getaddressinfo(hd_add) 57 if self.options.descriptors: 58 assert_equal(hd_info["hdkeypath"], "m/84'/1'/0'/0/" + str(i)) 59 else: 60 assert_equal(hd_info["hdkeypath"], "m/0'/0'/" + str(i) + "'") 61 assert_equal(hd_info["hdmasterfingerprint"], hd_fingerprint) 62 self.nodes[0].sendtoaddress(hd_add, 1) 63 self.nodes[0].generate(1) 64 self.nodes[0].sendtoaddress(non_hd_add, 1) 65 self.nodes[0].generate(1) 66 67 # create an internal key (again) 68 change_addr = self.nodes[1].getrawchangeaddress() 69 change_addrV = self.nodes[1].getaddressinfo(change_addr) 70 if self.options.descriptors: 71 assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/1") 72 else: 73 assert_equal(change_addrV["hdkeypath"], "m/0'/1'/1'") #second internal child key 74 75 self.sync_all() 76 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 77 78 self.log.info("Restore backup ...") 79 self.stop_node(1) 80 # we need to delete the complete chain directory 81 # otherwise node1 would auto-recover all funds in flag the keypool keys as used 82 shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "blocks")) 83 shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate")) 84 shutil.copyfile( 85 os.path.join(self.nodes[1].datadir, "hd.bak"), 86 os.path.join(self.nodes[1].datadir, self.chain, 'wallets', self.default_wallet_name, self.wallet_data_filename), 87 ) 88 self.start_node(1) 89 90 # Assert that derivation is deterministic 91 hd_add_2 = None 92 for i in range(1, NUM_HD_ADDS + 1): 93 hd_add_2 = self.nodes[1].getnewaddress() 94 hd_info_2 = self.nodes[1].getaddressinfo(hd_add_2) 95 if self.options.descriptors: 96 assert_equal(hd_info_2["hdkeypath"], "m/84'/1'/0'/0/" + str(i)) 97 else: 98 assert_equal(hd_info_2["hdkeypath"], "m/0'/0'/" + str(i) + "'") 99 assert_equal(hd_info_2["hdmasterfingerprint"], hd_fingerprint) 100 assert_equal(hd_add, hd_add_2) 101 self.connect_nodes(0, 1) 102 self.sync_all() 103 104 # Needs rescan 105 self.restart_node(1, extra_args=self.extra_args[1] + ['-rescan']) 106 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 107 108 # Try a RPC based rescan 109 self.stop_node(1) 110 shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "blocks")) 111 shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate")) 112 shutil.copyfile( 113 os.path.join(self.nodes[1].datadir, "hd.bak"), 114 os.path.join(self.nodes[1].datadir, self.chain, "wallets", self.default_wallet_name, self.wallet_data_filename), 115 ) 116 self.start_node(1, extra_args=self.extra_args[1]) 117 self.connect_nodes(0, 1) 118 self.sync_all() 119 # Wallet automatically scans blocks older than key on startup 120 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 121 out = self.nodes[1].rescanblockchain(0, 1) 122 assert_equal(out['start_height'], 0) 123 assert_equal(out['stop_height'], 1) 124 out = self.nodes[1].rescanblockchain() 125 assert_equal(out['start_height'], 0) 126 assert_equal(out['stop_height'], self.nodes[1].getblockcount()) 127 assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1) 128 129 # send a tx and make sure its using the internal chain for the changeoutput 130 txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1) 131 outs = self.nodes[1].decoderawtransaction(self.nodes[1].gettransaction(txid)['hex'])['vout'] 132 keypath = "" 133 for out in outs: 134 if out['value'] != 1: 135 keypath = self.nodes[1].getaddressinfo(out['scriptPubKey']['addresses'][0])['hdkeypath'] 136 137 if self.options.descriptors: 138 assert_equal(keypath[0:14], "m/84'/1'/0'/1/") 139 else: 140 assert_equal(keypath[0:7], "m/0'/1'") 141 142 if not self.options.descriptors: 143 # Generate a new HD seed on node 1 and make sure it is set 144 orig_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 145 self.nodes[1].sethdseed() 146 new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 147 assert orig_masterkeyid != new_masterkeyid 148 addr = self.nodes[1].getnewaddress() 149 # Make sure the new address is the first from the keypool 150 assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/0\'') 151 self.nodes[1].keypoolrefill(1) # Fill keypool with 1 key 152 153 # Set a new HD seed on node 1 without flushing the keypool 154 new_seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress()) 155 orig_masterkeyid = new_masterkeyid 156 self.nodes[1].sethdseed(False, new_seed) 157 new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid'] 158 assert orig_masterkeyid != new_masterkeyid 159 addr = self.nodes[1].getnewaddress() 160 assert_equal(orig_masterkeyid, self.nodes[1].getaddressinfo(addr)['hdseedid']) 161 # Make sure the new address continues previous keypool 162 assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/1\'') 163 164 # Check that the next address is from the new seed 165 self.nodes[1].keypoolrefill(1) 166 next_addr = self.nodes[1].getnewaddress() 167 assert_equal(new_masterkeyid, self.nodes[1].getaddressinfo(next_addr)['hdseedid']) 168 # Make sure the new address is not from previous keypool 169 assert_equal(self.nodes[1].getaddressinfo(next_addr)['hdkeypath'], 'm/0\'/0\'/0\'') 170 assert next_addr != addr 171 172 # Sethdseed parameter validity 173 assert_raises_rpc_error(-1, 'sethdseed', self.nodes[0].sethdseed, False, new_seed, 0) 174 assert_raises_rpc_error(-5, "Invalid private key", self.nodes[1].sethdseed, False, "not_wif") 175 assert_raises_rpc_error(-1, "JSON value is not a boolean as expected", self.nodes[1].sethdseed, "Not_bool") 176 assert_raises_rpc_error(-1, "JSON value is not a string as expected", self.nodes[1].sethdseed, False, True) 177 assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, new_seed) 178 assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, self.nodes[1].dumpprivkey(self.nodes[1].getnewaddress())) 179 180 self.log.info('Test sethdseed restoring with keys outside of the initial keypool') 181 self.nodes[0].generate(10) 182 # Restart node 1 with keypool of 3 and a different wallet 183 self.nodes[1].createwallet(wallet_name='origin', blank=True) 184 self.restart_node(1, extra_args=['-keypool=3', '-wallet=origin']) 185 self.connect_nodes(0, 1) 186 187 # sethdseed restoring and seeing txs to addresses out of the keypool 188 origin_rpc = self.nodes[1].get_wallet_rpc('origin') 189 seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress()) 190 origin_rpc.sethdseed(True, seed) 191 192 self.nodes[1].createwallet(wallet_name='restore', blank=True) 193 restore_rpc = self.nodes[1].get_wallet_rpc('restore') 194 restore_rpc.sethdseed(True, seed) # Set to be the same seed as origin_rpc 195 restore_rpc.sethdseed(True) # Rotate to a new seed, making original `seed` inactive 196 197 self.nodes[1].createwallet(wallet_name='restore2', blank=True) 198 restore2_rpc = self.nodes[1].get_wallet_rpc('restore2') 199 restore2_rpc.sethdseed(True, seed) # Set to be the same seed as origin_rpc 200 restore2_rpc.sethdseed(True) # Rotate to a new seed, making original `seed` inactive 201 202 # Check persistence of inactive seed by reloading restore. restore2 is still loaded to test the case where the wallet is not reloaded 203 restore_rpc.unloadwallet() 204 self.nodes[1].loadwallet('restore') 205 restore_rpc = self.nodes[1].get_wallet_rpc('restore') 206 207 # Empty origin keypool and get an address that is beyond the initial keypool 208 origin_rpc.getnewaddress() 209 origin_rpc.getnewaddress() 210 last_addr = origin_rpc.getnewaddress() # Last address of initial keypool 211 addr = origin_rpc.getnewaddress() # First address beyond initial keypool 212 213 # Check that the restored seed has last_addr but does not have addr 214 info = restore_rpc.getaddressinfo(last_addr) 215 assert_equal(info['ismine'], True) 216 info = restore_rpc.getaddressinfo(addr) 217 assert_equal(info['ismine'], False) 218 info = restore2_rpc.getaddressinfo(last_addr) 219 assert_equal(info['ismine'], True) 220 info = restore2_rpc.getaddressinfo(addr) 221 assert_equal(info['ismine'], False) 222 # Check that the origin seed has addr 223 info = origin_rpc.getaddressinfo(addr) 224 assert_equal(info['ismine'], True) 225 226 # Send a transaction to addr, which is out of the initial keypool. 227 # The wallet that has set a new seed (restore_rpc) should not detect this transaction. 228 txid = self.nodes[0].sendtoaddress(addr, 1) 229 origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex']) 230 self.nodes[0].generate(1) 231 self.sync_blocks() 232 origin_rpc.gettransaction(txid) 233 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, txid) 234 out_of_kp_txid = txid 235 236 # Send a transaction to last_addr, which is in the initial keypool. 237 # The wallet that has set a new seed (restore_rpc) should detect this transaction and generate 3 new keys from the initial seed. 238 # The previous transaction (out_of_kp_txid) should still not be detected as a rescan is required. 239 txid = self.nodes[0].sendtoaddress(last_addr, 1) 240 origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex']) 241 self.nodes[0].generate(1) 242 self.sync_blocks() 243 origin_rpc.gettransaction(txid) 244 restore_rpc.gettransaction(txid) 245 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, out_of_kp_txid) 246 restore2_rpc.gettransaction(txid) 247 assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore2_rpc.gettransaction, out_of_kp_txid) 248 249 # After rescanning, restore_rpc should now see out_of_kp_txid and generate an additional key. 250 # addr should now be part of restore_rpc and be ismine 251 restore_rpc.rescanblockchain() 252 restore_rpc.gettransaction(out_of_kp_txid) 253 info = restore_rpc.getaddressinfo(addr) 254 assert_equal(info['ismine'], True) 255 restore2_rpc.rescanblockchain() 256 restore2_rpc.gettransaction(out_of_kp_txid) 257 info = restore2_rpc.getaddressinfo(addr) 258 assert_equal(info['ismine'], True) 259 260 # Check again that 3 keys were derived. 261 # Empty keypool and get an address that is beyond the initial keypool 262 origin_rpc.getnewaddress() 263 origin_rpc.getnewaddress() 264 last_addr = origin_rpc.getnewaddress() 265 addr = origin_rpc.getnewaddress() 266 267 # Check that the restored seed has last_addr but does not have addr 268 info = restore_rpc.getaddressinfo(last_addr) 269 assert_equal(info['ismine'], True) 270 info = restore_rpc.getaddressinfo(addr) 271 assert_equal(info['ismine'], False) 272 info = restore2_rpc.getaddressinfo(last_addr) 273 assert_equal(info['ismine'], True) 274 info = restore2_rpc.getaddressinfo(addr) 275 assert_equal(info['ismine'], False) 276 277 278if __name__ == '__main__': 279 WalletHDTest().main() 280