1#!/usr/bin/env python3
2# Copyright (c) 2016-2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test Hierarchical Deterministic wallet function."""
6
7import os
8import shutil
9
10from test_framework.test_framework import BitcoinTestFramework
11from test_framework.util import (
12    assert_equal,
13    assert_raises_rpc_error,
14)
15
16
17class WalletHDTest(BitcoinTestFramework):
18    def set_test_params(self):
19        self.setup_clean_chain = True
20        self.num_nodes = 2
21        self.extra_args = [[], ['-keypool=0', '-addresstype=bech32']]
22        self.supports_cli = False
23
24    def skip_test_if_missing_module(self):
25        self.skip_if_no_wallet()
26
27    def run_test(self):
28        # Make sure we use hd, keep masterkeyid
29        hd_fingerprint = self.nodes[1].getaddressinfo(self.nodes[1].getnewaddress())['hdmasterfingerprint']
30        assert_equal(len(hd_fingerprint), 8)
31
32        # create an internal key
33        change_addr = self.nodes[1].getrawchangeaddress()
34        change_addrV = self.nodes[1].getaddressinfo(change_addr)
35        if self.options.descriptors:
36            assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/0")
37        else:
38            assert_equal(change_addrV["hdkeypath"], "m/0'/1'/0'")  #first internal child key
39
40        # Import a non-HD private key in the HD wallet
41        non_hd_add = 'ncrt1qmevj8zfx0wdvp05cqwkmr6mxkfx60yezy25dj7'
42        non_hd_key = 'cS9umN9w6cDMuRVYdbkfE4c7YUFLJRoXMfhQ569uY4odiQbVN8Rt'
43        self.nodes[1].importprivkey(non_hd_key)
44
45        # This should be enough to keep the master key and the non-HD key
46        self.nodes[1].backupwallet(os.path.join(self.nodes[1].datadir, "hd.bak"))
47        #self.nodes[1].dumpwallet(os.path.join(self.nodes[1].datadir, "hd.dump"))
48
49        # Derive some HD addresses and remember the last
50        # Also send funds to each add
51        self.nodes[0].generate(101)
52        hd_add = None
53        NUM_HD_ADDS = 10
54        for i in range(1, NUM_HD_ADDS + 1):
55            hd_add = self.nodes[1].getnewaddress()
56            hd_info = self.nodes[1].getaddressinfo(hd_add)
57            if self.options.descriptors:
58                assert_equal(hd_info["hdkeypath"], "m/84'/1'/0'/0/" + str(i))
59            else:
60                assert_equal(hd_info["hdkeypath"], "m/0'/0'/" + str(i) + "'")
61            assert_equal(hd_info["hdmasterfingerprint"], hd_fingerprint)
62            self.nodes[0].sendtoaddress(hd_add, 1)
63            self.nodes[0].generate(1)
64        self.nodes[0].sendtoaddress(non_hd_add, 1)
65        self.nodes[0].generate(1)
66
67        # create an internal key (again)
68        change_addr = self.nodes[1].getrawchangeaddress()
69        change_addrV = self.nodes[1].getaddressinfo(change_addr)
70        if self.options.descriptors:
71            assert_equal(change_addrV["hdkeypath"], "m/84'/1'/0'/1/1")
72        else:
73            assert_equal(change_addrV["hdkeypath"], "m/0'/1'/1'")  #second internal child key
74
75        self.sync_all()
76        assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
77
78        self.log.info("Restore backup ...")
79        self.stop_node(1)
80        # we need to delete the complete chain directory
81        # otherwise node1 would auto-recover all funds in flag the keypool keys as used
82        shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "blocks"))
83        shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate"))
84        shutil.copyfile(
85            os.path.join(self.nodes[1].datadir, "hd.bak"),
86            os.path.join(self.nodes[1].datadir, self.chain, 'wallets', self.default_wallet_name, self.wallet_data_filename),
87        )
88        self.start_node(1)
89
90        # Assert that derivation is deterministic
91        hd_add_2 = None
92        for i in range(1, NUM_HD_ADDS + 1):
93            hd_add_2 = self.nodes[1].getnewaddress()
94            hd_info_2 = self.nodes[1].getaddressinfo(hd_add_2)
95            if self.options.descriptors:
96                assert_equal(hd_info_2["hdkeypath"], "m/84'/1'/0'/0/" + str(i))
97            else:
98                assert_equal(hd_info_2["hdkeypath"], "m/0'/0'/" + str(i) + "'")
99            assert_equal(hd_info_2["hdmasterfingerprint"], hd_fingerprint)
100        assert_equal(hd_add, hd_add_2)
101        self.connect_nodes(0, 1)
102        self.sync_all()
103
104        # Needs rescan
105        self.restart_node(1, extra_args=self.extra_args[1] + ['-rescan'])
106        assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
107
108        # Try a RPC based rescan
109        self.stop_node(1)
110        shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "blocks"))
111        shutil.rmtree(os.path.join(self.nodes[1].datadir, self.chain, "chainstate"))
112        shutil.copyfile(
113            os.path.join(self.nodes[1].datadir, "hd.bak"),
114            os.path.join(self.nodes[1].datadir, self.chain, "wallets", self.default_wallet_name, self.wallet_data_filename),
115        )
116        self.start_node(1, extra_args=self.extra_args[1])
117        self.connect_nodes(0, 1)
118        self.sync_all()
119        # Wallet automatically scans blocks older than key on startup
120        assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
121        out = self.nodes[1].rescanblockchain(0, 1)
122        assert_equal(out['start_height'], 0)
123        assert_equal(out['stop_height'], 1)
124        out = self.nodes[1].rescanblockchain()
125        assert_equal(out['start_height'], 0)
126        assert_equal(out['stop_height'], self.nodes[1].getblockcount())
127        assert_equal(self.nodes[1].getbalance(), NUM_HD_ADDS + 1)
128
129        # send a tx and make sure its using the internal chain for the changeoutput
130        txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1)
131        outs = self.nodes[1].decoderawtransaction(self.nodes[1].gettransaction(txid)['hex'])['vout']
132        keypath = ""
133        for out in outs:
134            if out['value'] != 1:
135                keypath = self.nodes[1].getaddressinfo(out['scriptPubKey']['addresses'][0])['hdkeypath']
136
137        if self.options.descriptors:
138            assert_equal(keypath[0:14], "m/84'/1'/0'/1/")
139        else:
140            assert_equal(keypath[0:7], "m/0'/1'")
141
142        if not self.options.descriptors:
143            # Generate a new HD seed on node 1 and make sure it is set
144            orig_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
145            self.nodes[1].sethdseed()
146            new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
147            assert orig_masterkeyid != new_masterkeyid
148            addr = self.nodes[1].getnewaddress()
149            # Make sure the new address is the first from the keypool
150            assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/0\'')
151            self.nodes[1].keypoolrefill(1)  # Fill keypool with 1 key
152
153            # Set a new HD seed on node 1 without flushing the keypool
154            new_seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress())
155            orig_masterkeyid = new_masterkeyid
156            self.nodes[1].sethdseed(False, new_seed)
157            new_masterkeyid = self.nodes[1].getwalletinfo()['hdseedid']
158            assert orig_masterkeyid != new_masterkeyid
159            addr = self.nodes[1].getnewaddress()
160            assert_equal(orig_masterkeyid, self.nodes[1].getaddressinfo(addr)['hdseedid'])
161            # Make sure the new address continues previous keypool
162            assert_equal(self.nodes[1].getaddressinfo(addr)['hdkeypath'], 'm/0\'/0\'/1\'')
163
164            # Check that the next address is from the new seed
165            self.nodes[1].keypoolrefill(1)
166            next_addr = self.nodes[1].getnewaddress()
167            assert_equal(new_masterkeyid, self.nodes[1].getaddressinfo(next_addr)['hdseedid'])
168            # Make sure the new address is not from previous keypool
169            assert_equal(self.nodes[1].getaddressinfo(next_addr)['hdkeypath'], 'm/0\'/0\'/0\'')
170            assert next_addr != addr
171
172            # Sethdseed parameter validity
173            assert_raises_rpc_error(-1, 'sethdseed', self.nodes[0].sethdseed, False, new_seed, 0)
174            assert_raises_rpc_error(-5, "Invalid private key", self.nodes[1].sethdseed, False, "not_wif")
175            assert_raises_rpc_error(-1, "JSON value is not a boolean as expected", self.nodes[1].sethdseed, "Not_bool")
176            assert_raises_rpc_error(-1, "JSON value is not a string as expected", self.nodes[1].sethdseed, False, True)
177            assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, new_seed)
178            assert_raises_rpc_error(-5, "Already have this key", self.nodes[1].sethdseed, False, self.nodes[1].dumpprivkey(self.nodes[1].getnewaddress()))
179
180            self.log.info('Test sethdseed restoring with keys outside of the initial keypool')
181            self.nodes[0].generate(10)
182            # Restart node 1 with keypool of 3 and a different wallet
183            self.nodes[1].createwallet(wallet_name='origin', blank=True)
184            self.restart_node(1, extra_args=['-keypool=3', '-wallet=origin'])
185            self.connect_nodes(0, 1)
186
187            # sethdseed restoring and seeing txs to addresses out of the keypool
188            origin_rpc = self.nodes[1].get_wallet_rpc('origin')
189            seed = self.nodes[0].dumpprivkey(self.nodes[0].getnewaddress())
190            origin_rpc.sethdseed(True, seed)
191
192            self.nodes[1].createwallet(wallet_name='restore', blank=True)
193            restore_rpc = self.nodes[1].get_wallet_rpc('restore')
194            restore_rpc.sethdseed(True, seed)  # Set to be the same seed as origin_rpc
195            restore_rpc.sethdseed(True)  # Rotate to a new seed, making original `seed` inactive
196
197            self.nodes[1].createwallet(wallet_name='restore2', blank=True)
198            restore2_rpc = self.nodes[1].get_wallet_rpc('restore2')
199            restore2_rpc.sethdseed(True, seed)  # Set to be the same seed as origin_rpc
200            restore2_rpc.sethdseed(True)  # Rotate to a new seed, making original `seed` inactive
201
202            # Check persistence of inactive seed by reloading restore. restore2 is still loaded to test the case where the wallet is not reloaded
203            restore_rpc.unloadwallet()
204            self.nodes[1].loadwallet('restore')
205            restore_rpc = self.nodes[1].get_wallet_rpc('restore')
206
207            # Empty origin keypool and get an address that is beyond the initial keypool
208            origin_rpc.getnewaddress()
209            origin_rpc.getnewaddress()
210            last_addr = origin_rpc.getnewaddress()  # Last address of initial keypool
211            addr = origin_rpc.getnewaddress()  # First address beyond initial keypool
212
213            # Check that the restored seed has last_addr but does not have addr
214            info = restore_rpc.getaddressinfo(last_addr)
215            assert_equal(info['ismine'], True)
216            info = restore_rpc.getaddressinfo(addr)
217            assert_equal(info['ismine'], False)
218            info = restore2_rpc.getaddressinfo(last_addr)
219            assert_equal(info['ismine'], True)
220            info = restore2_rpc.getaddressinfo(addr)
221            assert_equal(info['ismine'], False)
222            # Check that the origin seed has addr
223            info = origin_rpc.getaddressinfo(addr)
224            assert_equal(info['ismine'], True)
225
226            # Send a transaction to addr, which is out of the initial keypool.
227            # The wallet that has set a new seed (restore_rpc) should not detect this transaction.
228            txid = self.nodes[0].sendtoaddress(addr, 1)
229            origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex'])
230            self.nodes[0].generate(1)
231            self.sync_blocks()
232            origin_rpc.gettransaction(txid)
233            assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, txid)
234            out_of_kp_txid = txid
235
236            # Send a transaction to last_addr, which is in the initial keypool.
237            # The wallet that has set a new seed (restore_rpc) should detect this transaction and generate 3 new keys from the initial seed.
238            # The previous transaction (out_of_kp_txid) should still not be detected as a rescan is required.
239            txid = self.nodes[0].sendtoaddress(last_addr, 1)
240            origin_rpc.sendrawtransaction(self.nodes[0].gettransaction(txid)['hex'])
241            self.nodes[0].generate(1)
242            self.sync_blocks()
243            origin_rpc.gettransaction(txid)
244            restore_rpc.gettransaction(txid)
245            assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore_rpc.gettransaction, out_of_kp_txid)
246            restore2_rpc.gettransaction(txid)
247            assert_raises_rpc_error(-5, 'Invalid or non-wallet transaction id', restore2_rpc.gettransaction, out_of_kp_txid)
248
249            # After rescanning, restore_rpc should now see out_of_kp_txid and generate an additional key.
250            # addr should now be part of restore_rpc and be ismine
251            restore_rpc.rescanblockchain()
252            restore_rpc.gettransaction(out_of_kp_txid)
253            info = restore_rpc.getaddressinfo(addr)
254            assert_equal(info['ismine'], True)
255            restore2_rpc.rescanblockchain()
256            restore2_rpc.gettransaction(out_of_kp_txid)
257            info = restore2_rpc.getaddressinfo(addr)
258            assert_equal(info['ismine'], True)
259
260            # Check again that 3 keys were derived.
261            # Empty keypool and get an address that is beyond the initial keypool
262            origin_rpc.getnewaddress()
263            origin_rpc.getnewaddress()
264            last_addr = origin_rpc.getnewaddress()
265            addr = origin_rpc.getnewaddress()
266
267            # Check that the restored seed has last_addr but does not have addr
268            info = restore_rpc.getaddressinfo(last_addr)
269            assert_equal(info['ismine'], True)
270            info = restore_rpc.getaddressinfo(addr)
271            assert_equal(info['ismine'], False)
272            info = restore2_rpc.getaddressinfo(last_addr)
273            assert_equal(info['ismine'], True)
274            info = restore2_rpc.getaddressinfo(addr)
275            assert_equal(info['ismine'], False)
276
277
278if __name__ == '__main__':
279    WalletHDTest().main()
280