1#!/usr/bin/env python3
2# Copyright (c) 2018-2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test the avoid_reuse and setwalletflag features."""
6
7from test_framework.test_framework import BitcoinTestFramework
8from test_framework.util import (
9    assert_approx,
10    assert_equal,
11    assert_raises_rpc_error,
12)
13
14def reset_balance(node, discardaddr):
15    '''Throw away all owned coins by the node so it gets a balance of 0.'''
16    balance = node.getbalance(avoid_reuse=False)
17    if balance > 0.5:
18        node.sendtoaddress(address=discardaddr, amount=balance, subtractfeefromamount=True, avoid_reuse=False)
19
20def count_unspent(node):
21    '''Count the unspent outputs for the given node and return various statistics'''
22    r = {
23        "total": {
24            "count": 0,
25            "sum": 0,
26        },
27        "reused": {
28            "count": 0,
29            "sum": 0,
30        },
31    }
32    supports_reused = True
33    for utxo in node.listunspent(minconf=0):
34        r["total"]["count"] += 1
35        r["total"]["sum"] += utxo["amount"]
36        if supports_reused and "reused" in utxo:
37            if utxo["reused"]:
38                r["reused"]["count"] += 1
39                r["reused"]["sum"] += utxo["amount"]
40        else:
41            supports_reused = False
42    r["reused"]["supported"] = supports_reused
43    return r
44
45def assert_unspent(node, total_count=None, total_sum=None, reused_supported=None, reused_count=None, reused_sum=None, margin=0.001):
46    '''Make assertions about a node's unspent output statistics'''
47    stats = count_unspent(node)
48    if total_count is not None:
49        assert_equal(stats["total"]["count"], total_count)
50    if total_sum is not None:
51        assert_approx(stats["total"]["sum"], total_sum, margin)
52    if reused_supported is not None:
53        assert_equal(stats["reused"]["supported"], reused_supported)
54    if reused_count is not None:
55        assert_equal(stats["reused"]["count"], reused_count)
56    if reused_sum is not None:
57        assert_approx(stats["reused"]["sum"], reused_sum, margin)
58
59def assert_balances(node, mine, margin=0.001):
60    '''Make assertions about a node's getbalances output'''
61    got = node.getbalances()["mine"]
62    for k,v in mine.items():
63        assert_approx(got[k], v, margin)
64
65class AvoidReuseTest(BitcoinTestFramework):
66
67    def set_test_params(self):
68        self.num_nodes = 2
69        # This test isn't testing txn relay/timing, so set whitelist on the
70        # peers for instant txn relay. This speeds up the test run time 2-3x.
71        self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
72
73    def skip_test_if_missing_module(self):
74        self.skip_if_no_wallet()
75
76    def run_test(self):
77        '''Set up initial chain and run tests defined below'''
78
79        self.test_persistence()
80        self.test_immutable()
81
82        self.nodes[0].generate(110)
83        self.sync_all()
84        self.test_change_remains_change(self.nodes[1])
85        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
86        self.test_sending_from_reused_address_without_avoid_reuse()
87        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
88        self.test_sending_from_reused_address_fails("legacy")
89        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
90        self.test_sending_from_reused_address_fails("p2sh-segwit")
91        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
92        self.test_sending_from_reused_address_fails("bech32")
93        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
94        self.test_getbalances_used()
95        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
96        self.test_full_destination_group_is_preferred()
97        reset_balance(self.nodes[1], self.nodes[0].getnewaddress())
98        self.test_all_destination_groups_are_used()
99
100    def test_persistence(self):
101        '''Test that wallet files persist the avoid_reuse flag.'''
102        self.log.info("Test wallet files persist avoid_reuse flag")
103
104        # Configure node 1 to use avoid_reuse
105        self.nodes[1].setwalletflag('avoid_reuse')
106
107        # Flags should be node1.avoid_reuse=false, node2.avoid_reuse=true
108        assert_equal(self.nodes[0].getwalletinfo()["avoid_reuse"], False)
109        assert_equal(self.nodes[1].getwalletinfo()["avoid_reuse"], True)
110
111        self.restart_node(1)
112        self.connect_nodes(0, 1)
113
114        # Flags should still be node1.avoid_reuse=false, node2.avoid_reuse=true
115        assert_equal(self.nodes[0].getwalletinfo()["avoid_reuse"], False)
116        assert_equal(self.nodes[1].getwalletinfo()["avoid_reuse"], True)
117
118        # Attempting to set flag to its current state should throw
119        assert_raises_rpc_error(-8, "Wallet flag is already set to false", self.nodes[0].setwalletflag, 'avoid_reuse', False)
120        assert_raises_rpc_error(-8, "Wallet flag is already set to true", self.nodes[1].setwalletflag, 'avoid_reuse', True)
121
122    def test_immutable(self):
123        '''Test immutable wallet flags'''
124        self.log.info("Test immutable wallet flags")
125
126        # Attempt to set the disable_private_keys flag; this should not work
127        assert_raises_rpc_error(-8, "Wallet flag is immutable", self.nodes[1].setwalletflag, 'disable_private_keys')
128
129        tempwallet = ".wallet_avoidreuse.py_test_immutable_wallet.dat"
130
131        # Create a wallet with disable_private_keys set; this should work
132        self.nodes[1].createwallet(wallet_name=tempwallet, disable_private_keys=True)
133        w = self.nodes[1].get_wallet_rpc(tempwallet)
134
135        # Attempt to unset the disable_private_keys flag; this should not work
136        assert_raises_rpc_error(-8, "Wallet flag is immutable", w.setwalletflag, 'disable_private_keys', False)
137
138        # Unload temp wallet
139        self.nodes[1].unloadwallet(tempwallet)
140
141    def test_change_remains_change(self, node):
142        self.log.info("Test that change doesn't turn into non-change when spent")
143
144        reset_balance(node, node.getnewaddress())
145        addr = node.getnewaddress()
146        txid = node.sendtoaddress(addr, 1)
147        out = node.listunspent(minconf=0, query_options={'minimumAmount': 2})
148        assert_equal(len(out), 1)
149        assert_equal(out[0]['txid'], txid)
150        changeaddr = out[0]['address']
151
152        # Make sure it's starting out as change as expected
153        assert node.getaddressinfo(changeaddr)['ischange']
154        for logical_tx in node.listtransactions():
155            assert logical_tx.get('address') != changeaddr
156
157        # Spend it
158        reset_balance(node, node.getnewaddress())
159
160        # It should still be change
161        assert node.getaddressinfo(changeaddr)['ischange']
162        for logical_tx in node.listtransactions():
163            assert logical_tx.get('address') != changeaddr
164
165    def test_sending_from_reused_address_without_avoid_reuse(self):
166        '''
167        Test the same as test_sending_from_reused_address_fails, except send the 10 BTC with
168        the avoid_reuse flag set to false. This means the 10 BTC send should succeed,
169        where it fails in test_sending_from_reused_address_fails.
170        '''
171        self.log.info("Test sending from reused address with avoid_reuse=false")
172
173        fundaddr = self.nodes[1].getnewaddress()
174        retaddr = self.nodes[0].getnewaddress()
175
176        self.nodes[0].sendtoaddress(fundaddr, 10)
177        self.nodes[0].generate(1)
178        self.sync_all()
179
180        # listunspent should show 1 single, unused 10 btc output
181        assert_unspent(self.nodes[1], total_count=1, total_sum=10, reused_supported=True, reused_count=0)
182        # getbalances should show no used, 10 btc trusted
183        assert_balances(self.nodes[1], mine={"used": 0, "trusted": 10})
184        # node 0 should not show a used entry, as it does not enable avoid_reuse
185        assert("used" not in self.nodes[0].getbalances()["mine"])
186
187        self.nodes[1].sendtoaddress(retaddr, 5)
188        self.nodes[0].generate(1)
189        self.sync_all()
190
191        # listunspent should show 1 single, unused 5 btc output
192        assert_unspent(self.nodes[1], total_count=1, total_sum=5, reused_supported=True, reused_count=0)
193        # getbalances should show no used, 5 btc trusted
194        assert_balances(self.nodes[1], mine={"used": 0, "trusted": 5})
195
196        self.nodes[0].sendtoaddress(fundaddr, 10)
197        self.nodes[0].generate(1)
198        self.sync_all()
199
200        # listunspent should show 2 total outputs (5, 10 btc), one unused (5), one reused (10)
201        assert_unspent(self.nodes[1], total_count=2, total_sum=15, reused_count=1, reused_sum=10)
202        # getbalances should show 10 used, 5 btc trusted
203        assert_balances(self.nodes[1], mine={"used": 10, "trusted": 5})
204
205        self.nodes[1].sendtoaddress(address=retaddr, amount=10, avoid_reuse=False)
206
207        # listunspent should show 1 total outputs (5 btc), unused
208        assert_unspent(self.nodes[1], total_count=1, total_sum=5, reused_count=0)
209        # getbalances should show no used, 5 btc trusted
210        assert_balances(self.nodes[1], mine={"used": 0, "trusted": 5})
211
212        # node 1 should now have about 5 btc left (for both cases)
213        assert_approx(self.nodes[1].getbalance(), 5, 0.001)
214        assert_approx(self.nodes[1].getbalance(avoid_reuse=False), 5, 0.001)
215
216    def test_sending_from_reused_address_fails(self, second_addr_type):
217        '''
218        Test the simple case where [1] generates a new address A, then
219        [0] sends 10 BTC to A.
220        [1] spends 5 BTC from A. (leaving roughly 5 BTC useable)
221        [0] sends 10 BTC to A again.
222        [1] tries to spend 10 BTC (fails; dirty).
223        [1] tries to spend 4 BTC (succeeds; change address sufficient)
224        '''
225        self.log.info("Test sending from reused {} address fails".format(second_addr_type))
226
227        fundaddr = self.nodes[1].getnewaddress(label="", address_type="legacy")
228        retaddr = self.nodes[0].getnewaddress()
229
230        self.nodes[0].sendtoaddress(fundaddr, 10)
231        self.nodes[0].generate(1)
232        self.sync_all()
233
234        # listunspent should show 1 single, unused 10 btc output
235        assert_unspent(self.nodes[1], total_count=1, total_sum=10, reused_supported=True, reused_count=0)
236        # getbalances should show no used, 10 btc trusted
237        assert_balances(self.nodes[1], mine={"used": 0, "trusted": 10})
238
239        self.nodes[1].sendtoaddress(retaddr, 5)
240        self.nodes[0].generate(1)
241        self.sync_all()
242
243        # listunspent should show 1 single, unused 5 btc output
244        assert_unspent(self.nodes[1], total_count=1, total_sum=5, reused_supported=True, reused_count=0)
245        # getbalances should show no used, 5 btc trusted
246        assert_balances(self.nodes[1], mine={"used": 0, "trusted": 5})
247
248        if not self.options.descriptors:
249            # For the second send, we transmute it to a related single-key address
250            # to make sure it's also detected as re-use
251            fund_spk = self.nodes[0].getaddressinfo(fundaddr)["scriptPubKey"]
252            fund_decoded = self.nodes[0].decodescript(fund_spk)
253            if second_addr_type == "p2sh-segwit":
254                new_fundaddr = fund_decoded["segwit"]["p2sh-segwit"]
255            elif second_addr_type == "bech32":
256                new_fundaddr = fund_decoded["segwit"]["address"]
257            else:
258                new_fundaddr = fundaddr
259                assert_equal(second_addr_type, "legacy")
260
261            self.nodes[0].sendtoaddress(new_fundaddr, 10)
262            self.nodes[0].generate(1)
263            self.sync_all()
264
265            # listunspent should show 2 total outputs (5, 10 btc), one unused (5), one reused (10)
266            assert_unspent(self.nodes[1], total_count=2, total_sum=15, reused_count=1, reused_sum=10)
267            # getbalances should show 10 used, 5 btc trusted
268            assert_balances(self.nodes[1], mine={"used": 10, "trusted": 5})
269
270            # node 1 should now have a balance of 5 (no dirty) or 15 (including dirty)
271            assert_approx(self.nodes[1].getbalance(), 5, 0.001)
272            assert_approx(self.nodes[1].getbalance(avoid_reuse=False), 15, 0.001)
273
274            assert_raises_rpc_error(-6, "Insufficient funds", self.nodes[1].sendtoaddress, retaddr, 10)
275
276            self.nodes[1].sendtoaddress(retaddr, 4)
277
278            # listunspent should show 2 total outputs (1, 10 btc), one unused (1), one reused (10)
279            assert_unspent(self.nodes[1], total_count=2, total_sum=11, reused_count=1, reused_sum=10)
280            # getbalances should show 10 used, 1 btc trusted
281            assert_balances(self.nodes[1], mine={"used": 10, "trusted": 1})
282
283            # node 1 should now have about 1 btc left (no dirty) and 11 (including dirty)
284            assert_approx(self.nodes[1].getbalance(), 1, 0.001)
285            assert_approx(self.nodes[1].getbalance(avoid_reuse=False), 11, 0.001)
286
287    def test_getbalances_used(self):
288        '''
289        getbalances and listunspent should pick up on reused addresses
290        immediately, even for address reusing outputs created before the first
291        transaction was spending from that address
292        '''
293        self.log.info("Test getbalances used category")
294
295        # node under test should be completely empty
296        assert_equal(self.nodes[1].getbalance(avoid_reuse=False), 0)
297
298        new_addr = self.nodes[1].getnewaddress()
299        ret_addr = self.nodes[0].getnewaddress()
300
301        # send multiple transactions, reusing one address
302        for _ in range(101):
303            self.nodes[0].sendtoaddress(new_addr, 1)
304
305        self.nodes[0].generate(1)
306        self.sync_all()
307
308        # send transaction that should not use all the available outputs
309        # per the current coin selection algorithm
310        self.nodes[1].sendtoaddress(ret_addr, 5)
311
312        # getbalances and listunspent should show the remaining outputs
313        # in the reused address as used/reused
314        assert_unspent(self.nodes[1], total_count=2, total_sum=96, reused_count=1, reused_sum=1, margin=0.01)
315        assert_balances(self.nodes[1], mine={"used": 1, "trusted": 95}, margin=0.01)
316
317    def test_full_destination_group_is_preferred(self):
318        '''
319        Test the case where [1] only has 101 outputs of 1 BTC in the same reused
320        address and tries to send a small payment of 0.5 BTC. The wallet
321        should use 100 outputs from the reused address as inputs and not a
322        single 1 BTC input, in order to join several outputs from the reused
323        address.
324        '''
325        self.log.info("Test that full destination groups are preferred in coin selection")
326
327        # Node under test should be empty
328        assert_equal(self.nodes[1].getbalance(avoid_reuse=False), 0)
329
330        new_addr = self.nodes[1].getnewaddress()
331        ret_addr = self.nodes[0].getnewaddress()
332
333        # Send 101 outputs of 1 BTC to the same, reused address in the wallet
334        for _ in range(101):
335            self.nodes[0].sendtoaddress(new_addr, 1)
336
337        self.nodes[0].generate(1)
338        self.sync_all()
339
340        # Sending a transaction that is smaller than each one of the
341        # available outputs
342        txid = self.nodes[1].sendtoaddress(address=ret_addr, amount=0.5)
343        inputs = self.nodes[1].getrawtransaction(txid, 1)["vin"]
344
345        # The transaction should use 100 inputs exactly
346        assert_equal(len(inputs), 100)
347
348    def test_all_destination_groups_are_used(self):
349        '''
350        Test the case where [1] only has 202 outputs of 1 BTC in the same reused
351        address and tries to send a payment of 200.5 BTC. The wallet
352        should use all 202 outputs from the reused address as inputs.
353        '''
354        self.log.info("Test that all destination groups are used")
355
356        # Node under test should be empty
357        assert_equal(self.nodes[1].getbalance(avoid_reuse=False), 0)
358
359        new_addr = self.nodes[1].getnewaddress()
360        ret_addr = self.nodes[0].getnewaddress()
361
362        # Send 202 outputs of 1 BTC to the same, reused address in the wallet
363        for _ in range(202):
364            self.nodes[0].sendtoaddress(new_addr, 1)
365
366        self.nodes[0].generate(1)
367        self.sync_all()
368
369        # Sending a transaction that needs to use the full groups
370        # of 100 inputs but also the incomplete group of 2 inputs.
371        txid = self.nodes[1].sendtoaddress(address=ret_addr, amount=200.5)
372        inputs = self.nodes[1].getrawtransaction(txid, 1)["vin"]
373
374        # The transaction should use 202 inputs exactly
375        assert_equal(len(inputs), 202)
376
377
378if __name__ == '__main__':
379    AvoidReuseTest().main()
380