1#!/usr/bin/env python3
2# Copyright (c) 2014-2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""Test the importprunedfunds and removeprunedfunds RPCs."""
6from decimal import Decimal
7
8from test_framework.blocktools import COINBASE_MATURITY
9from test_framework.address import key_to_p2wpkh
10from test_framework.key import ECKey
11from test_framework.test_framework import BitcoinTestFramework
12from test_framework.util import (
13    assert_equal,
14    assert_raises_rpc_error,
15)
16from test_framework.wallet_util import bytes_to_wif
17
18class ImportPrunedFundsTest(BitcoinTestFramework):
19    def set_test_params(self):
20        self.setup_clean_chain = True
21        self.num_nodes = 2
22
23    def skip_test_if_missing_module(self):
24        self.skip_if_no_wallet()
25
26    def run_test(self):
27        self.log.info("Mining blocks...")
28        self.nodes[0].generate(COINBASE_MATURITY + 1)
29
30        self.sync_all()
31
32        # address
33        address1 = self.nodes[0].getnewaddress()
34        # pubkey
35        address2 = self.nodes[0].getnewaddress()
36        # privkey
37        eckey = ECKey()
38        eckey.generate()
39        address3_privkey = bytes_to_wif(eckey.get_bytes())
40        address3 = key_to_p2wpkh(eckey.get_pubkey().get_bytes())
41        self.nodes[0].importprivkey(address3_privkey)
42
43        # Check only one address
44        address_info = self.nodes[0].getaddressinfo(address1)
45        assert_equal(address_info['ismine'], True)
46
47        self.sync_all()
48
49        # Node 1 sync test
50        assert_equal(self.nodes[1].getblockcount(), COINBASE_MATURITY + 1)
51
52        # Address Test - before import
53        address_info = self.nodes[1].getaddressinfo(address1)
54        assert_equal(address_info['iswatchonly'], False)
55        assert_equal(address_info['ismine'], False)
56
57        address_info = self.nodes[1].getaddressinfo(address2)
58        assert_equal(address_info['iswatchonly'], False)
59        assert_equal(address_info['ismine'], False)
60
61        address_info = self.nodes[1].getaddressinfo(address3)
62        assert_equal(address_info['iswatchonly'], False)
63        assert_equal(address_info['ismine'], False)
64
65        # Send funds to self
66        txnid1 = self.nodes[0].sendtoaddress(address1, 0.1)
67        self.nodes[0].generate(1)
68        rawtxn1 = self.nodes[0].gettransaction(txnid1)['hex']
69        proof1 = self.nodes[0].gettxoutproof([txnid1])
70
71        txnid2 = self.nodes[0].sendtoaddress(address2, 0.05)
72        self.nodes[0].generate(1)
73        rawtxn2 = self.nodes[0].gettransaction(txnid2)['hex']
74        proof2 = self.nodes[0].gettxoutproof([txnid2])
75
76        txnid3 = self.nodes[0].sendtoaddress(address3, 0.025)
77        self.nodes[0].generate(1)
78        rawtxn3 = self.nodes[0].gettransaction(txnid3)['hex']
79        proof3 = self.nodes[0].gettxoutproof([txnid3])
80
81        self.sync_all()
82
83        # Import with no affiliated address
84        assert_raises_rpc_error(-5, "No addresses", self.nodes[1].importprunedfunds, rawtxn1, proof1)
85
86        balance1 = self.nodes[1].getbalance()
87        assert_equal(balance1, Decimal(0))
88
89        # Import with affiliated address with no rescan
90        self.nodes[1].createwallet('wwatch', disable_private_keys=True)
91        wwatch = self.nodes[1].get_wallet_rpc('wwatch')
92        wwatch.importaddress(address=address2, rescan=False)
93        wwatch.importprunedfunds(rawtransaction=rawtxn2, txoutproof=proof2)
94        assert [tx for tx in wwatch.listtransactions(include_watchonly=True) if tx['txid'] == txnid2]
95
96        # Import with private key with no rescan
97        w1 = self.nodes[1].get_wallet_rpc(self.default_wallet_name)
98        w1.importprivkey(privkey=address3_privkey, rescan=False)
99        w1.importprunedfunds(rawtxn3, proof3)
100        assert [tx for tx in w1.listtransactions() if tx['txid'] == txnid3]
101        balance3 = w1.getbalance()
102        assert_equal(balance3, Decimal('0.025'))
103
104        # Addresses Test - after import
105        address_info = w1.getaddressinfo(address1)
106        assert_equal(address_info['iswatchonly'], False)
107        assert_equal(address_info['ismine'], False)
108        address_info = wwatch.getaddressinfo(address2)
109        if self.options.descriptors:
110            assert_equal(address_info['iswatchonly'], False)
111            assert_equal(address_info['ismine'], True)
112        else:
113            assert_equal(address_info['iswatchonly'], True)
114            assert_equal(address_info['ismine'], False)
115        address_info = w1.getaddressinfo(address3)
116        assert_equal(address_info['iswatchonly'], False)
117        assert_equal(address_info['ismine'], True)
118
119        # Remove transactions
120        assert_raises_rpc_error(-8, "Transaction does not exist in wallet.", w1.removeprunedfunds, txnid1)
121        assert not [tx for tx in w1.listtransactions(include_watchonly=True) if tx['txid'] == txnid1]
122
123        wwatch.removeprunedfunds(txnid2)
124        assert not [tx for tx in wwatch.listtransactions(include_watchonly=True) if tx['txid'] == txnid2]
125
126        w1.removeprunedfunds(txnid3)
127        assert not [tx for tx in w1.listtransactions(include_watchonly=True) if tx['txid'] == txnid3]
128
129if __name__ == '__main__':
130    ImportPrunedFundsTest().main()
131