1#!/usr/bin/env python3
2# Copyright (c) 2016-2018 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test label RPCs.
6
7RPCs tested are:
8    - getaddressesbylabel
9    - listaddressgroupings
10    - setlabel
11"""
12from collections import defaultdict
13
14from test_framework.test_framework import BitcoinTestFramework
15from test_framework.util import assert_equal, assert_raises_rpc_error
16
17class WalletLabelsTest(BitcoinTestFramework):
18    def set_test_params(self):
19        self.setup_clean_chain = True
20        self.num_nodes = 1
21
22    def skip_test_if_missing_module(self):
23        self.skip_if_no_wallet()
24
25    def run_test(self):
26        # Check that there's no UTXO on the node
27        node = self.nodes[0]
28        assert_equal(len(node.listunspent()), 0)
29
30        # Note each time we call generate, all generated coins go into
31        # the same address, so we call twice to get two addresses w/50 each
32        node.generatetoaddress(nblocks=1, address=node.getnewaddress(label='coinbase'))
33        node.generatetoaddress(nblocks=101, address=node.getnewaddress(label='coinbase'))
34        assert_equal(node.getbalance(), 100)
35
36        # there should be 2 address groups
37        # each with 1 address with a balance of 50 Bitcoins
38        address_groups = node.listaddressgroupings()
39        assert_equal(len(address_groups), 2)
40        # the addresses aren't linked now, but will be after we send to the
41        # common address
42        linked_addresses = set()
43        for address_group in address_groups:
44            assert_equal(len(address_group), 1)
45            assert_equal(len(address_group[0]), 3)
46            assert_equal(address_group[0][1], 50)
47            assert_equal(address_group[0][2], 'coinbase')
48            linked_addresses.add(address_group[0][0])
49
50        # send 50 from each address to a third address not in this wallet
51        common_address = "msf4WtN1YQKXvNtvdFYt9JBnUD2FB41kjr"
52        node.sendmany(
53            amounts={common_address: 100},
54            subtractfeefrom=[common_address],
55            minconf=1,
56        )
57        # there should be 1 address group, with the previously
58        # unlinked addresses now linked (they both have 0 balance)
59        address_groups = node.listaddressgroupings()
60        assert_equal(len(address_groups), 1)
61        assert_equal(len(address_groups[0]), 2)
62        assert_equal(set([a[0] for a in address_groups[0]]), linked_addresses)
63        assert_equal([a[1] for a in address_groups[0]], [0, 0])
64
65        node.generate(1)
66
67        # we want to reset so that the "" label has what's expected.
68        # otherwise we're off by exactly the fee amount as that's mined
69        # and matures in the next 100 blocks
70        amount_to_send = 1.0
71
72        # Create labels and make sure subsequent label API calls
73        # recognize the label/address associations.
74        labels = [Label(name) for name in ("a", "b", "c", "d", "e")]
75        for label in labels:
76            address = node.getnewaddress(label.name)
77            label.add_receive_address(address)
78            label.verify(node)
79
80        # Check all labels are returned by listlabels.
81        assert_equal(node.listlabels(), sorted(['coinbase'] + [label.name for label in labels]))
82
83        # Send a transaction to each label.
84        for label in labels:
85            node.sendtoaddress(label.addresses[0], amount_to_send)
86            label.verify(node)
87
88        # Check the amounts received.
89        node.generate(1)
90        for label in labels:
91            assert_equal(
92                node.getreceivedbyaddress(label.addresses[0]), amount_to_send)
93            assert_equal(node.getreceivedbylabel(label.name), amount_to_send)
94
95        for i, label in enumerate(labels):
96            to_label = labels[(i + 1) % len(labels)]
97            node.sendtoaddress(to_label.addresses[0], amount_to_send)
98        node.generate(1)
99        for label in labels:
100            address = node.getnewaddress(label.name)
101            label.add_receive_address(address)
102            label.verify(node)
103            assert_equal(node.getreceivedbylabel(label.name), 2)
104            label.verify(node)
105        node.generate(101)
106
107        # Check that setlabel can assign a label to a new unused address.
108        for label in labels:
109            address = node.getnewaddress()
110            node.setlabel(address, label.name)
111            label.add_address(address)
112            label.verify(node)
113            assert_raises_rpc_error(-11, "No addresses with label", node.getaddressesbylabel, "")
114
115        # Check that addmultisigaddress can assign labels.
116        for label in labels:
117            addresses = []
118            for x in range(10):
119                addresses.append(node.getnewaddress())
120            multisig_address = node.addmultisigaddress(5, addresses, label.name)['address']
121            label.add_address(multisig_address)
122            label.purpose[multisig_address] = "send"
123            label.verify(node)
124        node.generate(101)
125
126        # Check that setlabel can change the label of an address from a
127        # different label.
128        change_label(node, labels[0].addresses[0], labels[0], labels[1])
129
130        # Check that setlabel can set the label of an address already
131        # in the label. This is a no-op.
132        change_label(node, labels[2].addresses[0], labels[2], labels[2])
133
134class Label:
135    def __init__(self, name):
136        # Label name
137        self.name = name
138        # Current receiving address associated with this label.
139        self.receive_address = None
140        # List of all addresses assigned with this label
141        self.addresses = []
142        # Map of address to address purpose
143        self.purpose = defaultdict(lambda: "receive")
144
145    def add_address(self, address):
146        assert_equal(address not in self.addresses, True)
147        self.addresses.append(address)
148
149    def add_receive_address(self, address):
150        self.add_address(address)
151
152    def verify(self, node):
153        if self.receive_address is not None:
154            assert self.receive_address in self.addresses
155
156        for address in self.addresses:
157            assert_equal(
158                node.getaddressinfo(address)['labels'][0],
159                {"name": self.name,
160                 "purpose": self.purpose[address]})
161            assert_equal(node.getaddressinfo(address)['label'], self.name)
162
163        assert_equal(
164            node.getaddressesbylabel(self.name),
165            {address: {"purpose": self.purpose[address]} for address in self.addresses})
166
167def change_label(node, address, old_label, new_label):
168    assert_equal(address in old_label.addresses, True)
169    node.setlabel(address, new_label.name)
170
171    old_label.addresses.remove(address)
172    new_label.add_address(address)
173
174    old_label.verify(node)
175    new_label.verify(node)
176
177if __name__ == '__main__':
178    WalletLabelsTest().main()
179