1#!/usr/bin/env python3
2# Copyright (c) 2016-2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test label RPCs.
6
7RPCs tested are:
8    - getaddressesbylabel
9    - listaddressgroupings
10    - setlabel
11"""
12from collections import defaultdict
13
14from test_framework.blocktools import COINBASE_MATURITY
15from test_framework.test_framework import BitcoinTestFramework
16from test_framework.util import assert_equal, assert_raises_rpc_error
17from test_framework.wallet_util import test_address
18
19
20class WalletLabelsTest(BitcoinTestFramework):
21    def set_test_params(self):
22        self.setup_clean_chain = True
23        self.num_nodes = 1
24
25    def skip_test_if_missing_module(self):
26        self.skip_if_no_wallet()
27
28    def run_test(self):
29        # Check that there's no UTXO on the node
30        node = self.nodes[0]
31        assert_equal(len(node.listunspent()), 0)
32
33        # Note each time we call generate, all generated coins go into
34        # the same address, so we call twice to get two addresses w/50 each
35        node.generatetoaddress(nblocks=1, address=node.getnewaddress(label='coinbase'))
36        node.generatetoaddress(nblocks=COINBASE_MATURITY + 1, address=node.getnewaddress(label='coinbase'))
37        assert_equal(node.getbalance(), 100)
38
39        # there should be 2 address groups
40        # each with 1 address with a balance of 50 Bitcoins
41        address_groups = node.listaddressgroupings()
42        assert_equal(len(address_groups), 2)
43        # the addresses aren't linked now, but will be after we send to the
44        # common address
45        linked_addresses = set()
46        for address_group in address_groups:
47            assert_equal(len(address_group), 1)
48            assert_equal(len(address_group[0]), 3)
49            assert_equal(address_group[0][1], 50)
50            assert_equal(address_group[0][2], 'coinbase')
51            linked_addresses.add(address_group[0][0])
52
53        # send 50 from each address to a third address not in this wallet
54        common_address = "msf4WtN1YQKXvNtvdFYt9JBnUD2FB41kjr"
55        node.sendmany(
56            amounts={common_address: 100},
57            subtractfeefrom=[common_address],
58            minconf=1,
59        )
60        # there should be 1 address group, with the previously
61        # unlinked addresses now linked (they both have 0 balance)
62        address_groups = node.listaddressgroupings()
63        assert_equal(len(address_groups), 1)
64        assert_equal(len(address_groups[0]), 2)
65        assert_equal(set([a[0] for a in address_groups[0]]), linked_addresses)
66        assert_equal([a[1] for a in address_groups[0]], [0, 0])
67
68        node.generate(1)
69
70        # we want to reset so that the "" label has what's expected.
71        # otherwise we're off by exactly the fee amount as that's mined
72        # and matures in the next 100 blocks
73        amount_to_send = 1.0
74
75        # Create labels and make sure subsequent label API calls
76        # recognize the label/address associations.
77        labels = [Label(name) for name in ("a", "b", "c", "d", "e")]
78        for label in labels:
79            address = node.getnewaddress(label.name)
80            label.add_receive_address(address)
81            label.verify(node)
82
83        # Check all labels are returned by listlabels.
84        assert_equal(node.listlabels(), sorted(['coinbase'] + [label.name for label in labels]))
85
86        # Send a transaction to each label.
87        for label in labels:
88            node.sendtoaddress(label.addresses[0], amount_to_send)
89            label.verify(node)
90
91        # Check the amounts received.
92        node.generate(1)
93        for label in labels:
94            assert_equal(
95                node.getreceivedbyaddress(label.addresses[0]), amount_to_send)
96            assert_equal(node.getreceivedbylabel(label.name), amount_to_send)
97
98        for i, label in enumerate(labels):
99            to_label = labels[(i + 1) % len(labels)]
100            node.sendtoaddress(to_label.addresses[0], amount_to_send)
101        node.generate(1)
102        for label in labels:
103            address = node.getnewaddress(label.name)
104            label.add_receive_address(address)
105            label.verify(node)
106            assert_equal(node.getreceivedbylabel(label.name), 2)
107            label.verify(node)
108        node.generate(COINBASE_MATURITY + 1)
109
110        # Check that setlabel can assign a label to a new unused address.
111        for label in labels:
112            address = node.getnewaddress()
113            node.setlabel(address, label.name)
114            label.add_address(address)
115            label.verify(node)
116            assert_raises_rpc_error(-11, "No addresses with label", node.getaddressesbylabel, "")
117
118        # Check that addmultisigaddress can assign labels.
119        if not self.options.descriptors:
120            for label in labels:
121                addresses = []
122                for _ in range(10):
123                    addresses.append(node.getnewaddress())
124                multisig_address = node.addmultisigaddress(5, addresses, label.name)['address']
125                label.add_address(multisig_address)
126                label.purpose[multisig_address] = "send"
127                label.verify(node)
128            node.generate(COINBASE_MATURITY + 1)
129
130        # Check that setlabel can change the label of an address from a
131        # different label.
132        change_label(node, labels[0].addresses[0], labels[0], labels[1])
133
134        # Check that setlabel can set the label of an address already
135        # in the label. This is a no-op.
136        change_label(node, labels[2].addresses[0], labels[2], labels[2])
137
138        if self.options.descriptors:
139            # This is a descriptor wallet test because of segwit v1+ addresses
140            self.log.info('Check watchonly labels')
141            node.createwallet(wallet_name='watch_only', disable_private_keys=True)
142            wallet_watch_only = node.get_wallet_rpc('watch_only')
143            BECH32_VALID = {
144                '✔️_VER15_PROG40': 'bcrt10qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqxkg7fn',
145                '✔️_VER16_PROG03': 'bcrt1sqqqqq8uhdgr',
146                '✔️_VER16_PROB02': 'bcrt1sqqqq4wstyw',
147            }
148            BECH32_INVALID = {
149                '❌_VER15_PROG41': 'bcrt1sqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqajlxj8',
150                '❌_VER16_PROB01': 'bcrt1sqq5r4036',
151            }
152            for l in BECH32_VALID:
153                ad = BECH32_VALID[l]
154                wallet_watch_only.importaddress(label=l, rescan=False, address=ad)
155                node.generatetoaddress(1, ad)
156                assert_equal(wallet_watch_only.getaddressesbylabel(label=l), {ad: {'purpose': 'receive'}})
157                assert_equal(wallet_watch_only.getreceivedbylabel(label=l), 0)
158            for l in BECH32_INVALID:
159                ad = BECH32_INVALID[l]
160                assert_raises_rpc_error(
161                    -5,
162                    "Address is not valid" if self.options.descriptors else "Invalid Bitcoin address or script",
163                    lambda: wallet_watch_only.importaddress(label=l, rescan=False, address=ad),
164                )
165
166
167class Label:
168    def __init__(self, name):
169        # Label name
170        self.name = name
171        # Current receiving address associated with this label.
172        self.receive_address = None
173        # List of all addresses assigned with this label
174        self.addresses = []
175        # Map of address to address purpose
176        self.purpose = defaultdict(lambda: "receive")
177
178    def add_address(self, address):
179        assert_equal(address not in self.addresses, True)
180        self.addresses.append(address)
181
182    def add_receive_address(self, address):
183        self.add_address(address)
184
185    def verify(self, node):
186        if self.receive_address is not None:
187            assert self.receive_address in self.addresses
188        for address in self.addresses:
189            test_address(node, address, labels=[self.name])
190        assert self.name in node.listlabels()
191        assert_equal(
192            node.getaddressesbylabel(self.name),
193            {address: {"purpose": self.purpose[address]} for address in self.addresses})
194
195def change_label(node, address, old_label, new_label):
196    assert_equal(address in old_label.addresses, True)
197    node.setlabel(address, new_label.name)
198
199    old_label.addresses.remove(address)
200    new_label.add_address(address)
201
202    old_label.verify(node)
203    new_label.verify(node)
204
205if __name__ == '__main__':
206    WalletLabelsTest().main()
207