1#!/usr/bin/env python3 2# Copyright (c) 2016-2020 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""Test the dumpwallet RPC.""" 6import datetime 7import os 8import time 9 10from test_framework.test_framework import BitcoinTestFramework 11from test_framework.util import ( 12 assert_equal, 13 assert_raises_rpc_error, 14) 15 16 17def read_dump(file_name, addrs, script_addrs, hd_master_addr_old): 18 """ 19 Read the given dump, count the addrs that match, count change and reserve. 20 Also check that the old hd_master is inactive 21 """ 22 with open(file_name, encoding='utf8') as inputfile: 23 found_comments = [] 24 found_legacy_addr = 0 25 found_p2sh_segwit_addr = 0 26 found_bech32_addr = 0 27 found_script_addr = 0 28 found_addr_chg = 0 29 found_addr_rsv = 0 30 hd_master_addr_ret = None 31 for line in inputfile: 32 line = line.strip() 33 if not line: 34 continue 35 if line[0] == '#': 36 found_comments.append(line) 37 else: 38 # split out some data 39 key_date_label, comment = line.split("#") 40 key_date_label = key_date_label.split(" ") 41 # key = key_date_label[0] 42 date = key_date_label[1] 43 keytype = key_date_label[2] 44 45 imported_key = date == '1970-01-01T00:00:01Z' 46 if imported_key: 47 # Imported keys have multiple addresses, no label (keypath) and timestamp 48 # Skip them 49 continue 50 51 addr_keypath = comment.split(" addr=")[1] 52 addr = addr_keypath.split(" ")[0] 53 keypath = None 54 if keytype == "inactivehdseed=1": 55 # ensure the old master is still available 56 assert hd_master_addr_old == addr 57 elif keytype == "hdseed=1": 58 # ensure we have generated a new hd master key 59 assert hd_master_addr_old != addr 60 hd_master_addr_ret = addr 61 elif keytype == "script=1": 62 # scripts don't have keypaths 63 keypath = None 64 else: 65 keypath = addr_keypath.rstrip().split("hdkeypath=")[1] 66 67 # count key types 68 for addrObj in addrs: 69 if addrObj['address'] == addr.split(",")[0] and addrObj['hdkeypath'] == keypath and keytype == "label=": 70 if addr.startswith('ncrt1'): 71 found_bech32_addr += 1 72 elif addr.startswith('m') or addr.startswith('n'): 73 # P2PKH address 74 found_legacy_addr += 1 75 elif addr.startswith('2'): 76 # P2SH-segwit address 77 found_p2sh_segwit_addr += 1 78 break 79 elif keytype == "change=1": 80 found_addr_chg += 1 81 break 82 elif keytype == "reserve=1": 83 found_addr_rsv += 1 84 break 85 86 # count scripts 87 for script_addr in script_addrs: 88 if script_addr == addr.rstrip() and keytype == "script=1": 89 found_script_addr += 1 90 break 91 92 return found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_ret 93 94 95class WalletDumpTest(BitcoinTestFramework): 96 def set_test_params(self): 97 self.num_nodes = 1 98 self.extra_args = [["-keypool=90", "-addresstype=legacy"]] 99 self.rpc_timeout = 120 100 101 def skip_test_if_missing_module(self): 102 self.skip_if_no_wallet() 103 104 def setup_network(self): 105 self.add_nodes(self.num_nodes, extra_args=self.extra_args) 106 self.start_nodes() 107 108 def run_test(self): 109 self.nodes[0].createwallet("dump") 110 111 wallet_unenc_dump = os.path.join(self.nodes[0].datadir, "wallet.unencrypted.dump") 112 wallet_enc_dump = os.path.join(self.nodes[0].datadir, "wallet.encrypted.dump") 113 114 # generate 30 addresses to compare against the dump 115 # - 10 legacy P2PKH 116 # - 10 P2SH-segwit 117 # - 10 bech32 118 test_addr_count = 10 119 addrs = [] 120 for address_type in ['legacy', 'p2sh-segwit', 'bech32']: 121 for _ in range(test_addr_count): 122 addr = self.nodes[0].getnewaddress(address_type=address_type) 123 vaddr = self.nodes[0].getaddressinfo(addr) # required to get hd keypath 124 addrs.append(vaddr) 125 126 # Test scripts dump by adding a 1-of-1 multisig address 127 multisig_addr = self.nodes[0].addmultisigaddress(1, [addrs[1]["address"]])["address"] 128 129 # Refill the keypool. getnewaddress() refills the keypool *before* taking a key from 130 # the keypool, so the final call to getnewaddress leaves the keypool with one key below 131 # its capacity 132 self.nodes[0].keypoolrefill() 133 134 self.log.info('Mine a block one second before the wallet is dumped') 135 dump_time = int(time.time()) 136 self.nodes[0].setmocktime(dump_time - 1) 137 self.nodes[0].generate(1) 138 self.nodes[0].setmocktime(dump_time) 139 dump_time_str = '# * Created on {}Z'.format( 140 datetime.datetime.fromtimestamp( 141 dump_time, 142 tz=datetime.timezone.utc, 143 ).replace(tzinfo=None).isoformat()) 144 dump_best_block_1 = '# * Best block at time of backup was {} ({}),'.format( 145 self.nodes[0].getblockcount(), 146 self.nodes[0].getbestblockhash(), 147 ) 148 dump_best_block_2 = '# mined on {}Z'.format( 149 datetime.datetime.fromtimestamp( 150 dump_time - 1, 151 tz=datetime.timezone.utc, 152 ).replace(tzinfo=None).isoformat()) 153 154 self.log.info('Dump unencrypted wallet') 155 result = self.nodes[0].dumpwallet(wallet_unenc_dump) 156 assert_equal(result['filename'], wallet_unenc_dump) 157 158 found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, hd_master_addr_unenc = \ 159 read_dump(wallet_unenc_dump, addrs, [multisig_addr], None) 160 assert '# End of dump' in found_comments # Check that file is not corrupt 161 assert_equal(dump_time_str, next(c for c in found_comments if c.startswith('# * Created on'))) 162 assert_equal(dump_best_block_1, next(c for c in found_comments if c.startswith('# * Best block'))) 163 assert_equal(dump_best_block_2, next(c for c in found_comments if c.startswith('# mined on'))) 164 assert_equal(found_legacy_addr, test_addr_count) # all keys must be in the dump 165 assert_equal(found_p2sh_segwit_addr, test_addr_count) # all keys must be in the dump 166 assert_equal(found_bech32_addr, test_addr_count) # all keys must be in the dump 167 assert_equal(found_script_addr, 1) # all scripts must be in the dump 168 assert_equal(found_addr_chg, 0) # 0 blocks where mined 169 assert_equal(found_addr_rsv, 90 * 2) # 90 keys plus 100% internal keys 170 171 # encrypt wallet, restart, unlock and dump 172 self.nodes[0].encryptwallet('test') 173 self.nodes[0].walletpassphrase('test', 100) 174 # Should be a no-op: 175 self.nodes[0].keypoolrefill() 176 self.nodes[0].dumpwallet(wallet_enc_dump) 177 178 found_comments, found_legacy_addr, found_p2sh_segwit_addr, found_bech32_addr, found_script_addr, found_addr_chg, found_addr_rsv, _ = \ 179 read_dump(wallet_enc_dump, addrs, [multisig_addr], hd_master_addr_unenc) 180 assert '# End of dump' in found_comments # Check that file is not corrupt 181 assert_equal(dump_time_str, next(c for c in found_comments if c.startswith('# * Created on'))) 182 assert_equal(dump_best_block_1, next(c for c in found_comments if c.startswith('# * Best block'))) 183 assert_equal(dump_best_block_2, next(c for c in found_comments if c.startswith('# mined on'))) 184 assert_equal(found_legacy_addr, test_addr_count) # all keys must be in the dump 185 assert_equal(found_p2sh_segwit_addr, test_addr_count) # all keys must be in the dump 186 assert_equal(found_bech32_addr, test_addr_count) # all keys must be in the dump 187 assert_equal(found_script_addr, 1) 188 assert_equal(found_addr_chg, 90 * 2) # old reserve keys are marked as change now 189 assert_equal(found_addr_rsv, 90 * 2) 190 191 # Overwriting should fail 192 assert_raises_rpc_error(-8, "already exists", lambda: self.nodes[0].dumpwallet(wallet_enc_dump)) 193 194 # Restart node with new wallet, and test importwallet 195 self.restart_node(0) 196 self.nodes[0].createwallet("w2") 197 198 # Make sure the address is not IsMine before import 199 result = self.nodes[0].getaddressinfo(multisig_addr) 200 assert not result['ismine'] 201 202 self.nodes[0].importwallet(wallet_unenc_dump) 203 204 # Now check IsMine is true 205 result = self.nodes[0].getaddressinfo(multisig_addr) 206 assert result['ismine'] 207 208 self.log.info('Check that wallet is flushed') 209 with self.nodes[0].assert_debug_log(['Flushing wallet.dat'], timeout=20): 210 self.nodes[0].getnewaddress() 211 212 213if __name__ == '__main__': 214 WalletDumpTest().main() 215