1#!/usr/bin/env python3
2# Copyright (c) 2020 The Bitcoin Core developers
3# Distributed under the MIT software license, see the accompanying
4# file COPYING or http://www.opensource.org/licenses/mit-license.php.
5"""A limited-functionality wallet, which may replace a real wallet in tests"""
7from decimal import Decimal
8from enum import Enum
9from typing import Optional
10from test_framework.address import ADDRESS_BCRT1_P2WSH_OP_TRUE
11from test_framework.key import ECKey
12from test_framework.messages import (
13    COIN,
14    COutPoint,
15    CTransaction,
16    CTxIn,
17    CTxInWitness,
18    CTxOut,
20from test_framework.script import (
21    CScript,
22    LegacySignatureHash,
24    OP_TRUE,
25    OP_NOP,
28from test_framework.util import (
29    assert_equal,
30    hex_str_to_bytes,
31    satoshi_round,
35class MiniWalletMode(Enum):
36    """Determines the transaction type the MiniWallet is creating and spending.
38    For most purposes, the default mode ADDRESS_OP_TRUE should be sufficient;
39    it simply uses a fixed bech32 P2WSH address whose coins are spent with a
40    witness stack of OP_TRUE, i.e. following an anyone-can-spend policy.
41    However, if the transactions need to be modified by the user (e.g. prepending
42    scriptSig for testing opcodes that are activated by a soft-fork), or the txs
43    should contain an actual signature, the raw modes RAW_OP_TRUE and RAW_P2PK
44    can be useful. Summary of modes:
46                    |      output       |           |  tx is   | can modify |  needs
47         mode       |    description    |  address  | standard | scriptSig  | signing
48    ----------------+-------------------+-----------+----------+------------+----------
49    ADDRESS_OP_TRUE | anyone-can-spend  |  bech32   |   yes    |    no      |   no
50    RAW_OP_TRUE     | anyone-can-spend  |  - (raw)  |   no     |    yes     |   no
51    RAW_P2PK        | pay-to-public-key |  - (raw)  |   yes    |    yes     |   yes
52    """
54    RAW_OP_TRUE = 2
55    RAW_P2PK = 3
58class MiniWallet:
59    def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE):
60        self._test_node = test_node
61        self._utxos = []
62        self._priv_key = None
63        self._address = None
65        assert isinstance(mode, MiniWalletMode)
66        if mode == MiniWalletMode.RAW_OP_TRUE:
67            self._scriptPubKey = bytes(CScript([OP_TRUE]))
68        elif mode == MiniWalletMode.RAW_P2PK:
69            # use simple deterministic private key (k=1)
70            self._priv_key = ECKey()
71            self._priv_key.set((1).to_bytes(32, 'big'), True)
72            pub_key = self._priv_key.get_pubkey()
73            self._scriptPubKey = bytes(CScript([pub_key.get_bytes(), OP_CHECKSIG]))
74        elif mode == MiniWalletMode.ADDRESS_OP_TRUE:
75            self._address = ADDRESS_BCRT1_P2WSH_OP_TRUE
76            self._scriptPubKey = hex_str_to_bytes(self._test_node.validateaddress(self._address)['scriptPubKey'])
78    def scan_blocks(self, *, start=1, num):
79        """Scan the blocks for self._address outputs and add them to self._utxos"""
80        for i in range(start, start + num):
81            block = self._test_node.getblock(blockhash=self._test_node.getblockhash(i), verbosity=2)
82            for tx in block['tx']:
83                self.scan_tx(tx)
85    def scan_tx(self, tx):
86        """Scan the tx for self._scriptPubKey outputs and add them to self._utxos"""
87        for out in tx['vout']:
88            if out['scriptPubKey']['hex'] == self._scriptPubKey.hex():
89                self._utxos.append({'txid': tx['txid'], 'vout': out['n'], 'value': out['value']})
91    def sign_tx(self, tx, fixed_length=True):
92        """Sign tx that has been created by MiniWallet in P2PK mode"""
93        assert self._priv_key is not None
94        (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
95        assert err is None
96        # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
97        # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes)
98        # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes
99        der_sig = b''
100        while not len(der_sig) == 71:
101            der_sig = self._priv_key.sign_ecdsa(sighash)
102            if not fixed_length:
103                break
104        tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))])
106    def generate(self, num_blocks):
107        """Generate blocks with coinbase outputs to the internal address, and append the outputs to the internal list"""
108        blocks = self._test_node.generatetodescriptor(num_blocks, f'raw({self._scriptPubKey.hex()})')
109        for b in blocks:
110            cb_tx = self._test_node.getblock(blockhash=b, verbosity=2)['tx'][0]
111            self._utxos.append({'txid': cb_tx['txid'], 'vout': 0, 'value': cb_tx['vout'][0]['value']})
112        return blocks
114    def get_address(self):
115        return self._address
117    def get_utxo(self, *, txid: Optional[str]='', mark_as_spent=True):
118        """
119        Returns a utxo and marks it as spent (pops it from the internal list)
121        Args:
122        txid: get the first utxo we find from a specific transaction
124        Note: Can be used to get the change output immediately after a send_self_transfer
125        """
126        index = -1  # by default the last utxo
127        if txid:
128            utxo = next(filter(lambda utxo: txid == utxo['txid'], self._utxos))
129            index = self._utxos.index(utxo)
130        if mark_as_spent:
131            return self._utxos.pop(index)
132        else:
133            return self._utxos[index]
135    def send_self_transfer(self, **kwargs):
136        """Create and send a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed."""
137        tx = self.create_self_transfer(**kwargs)
138        self.sendrawtransaction(from_node=kwargs['from_node'], tx_hex=tx['hex'])
139        return tx
141    def create_self_transfer(self, *, fee_rate=Decimal("0.003"), from_node, utxo_to_spend=None, mempool_valid=True, locktime=0, sequence=0):
142        """Create and return a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed."""
143        self._utxos = sorted(self._utxos, key=lambda k: k['value'])
144        utxo_to_spend = utxo_to_spend or self._utxos.pop()  # Pick the largest utxo (if none provided) and hope it covers the fee
145        if self._priv_key is None:
146            vsize = Decimal(96)  # anyone-can-spend
147        else:
148            vsize = Decimal(168)  # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other)
149        send_value = satoshi_round(utxo_to_spend['value'] - fee_rate * (vsize / 1000))
150        fee = utxo_to_spend['value'] - send_value
151        assert send_value > 0
153        tx = CTransaction()
154        tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=sequence)]
155        tx.vout = [CTxOut(int(send_value * COIN), self._scriptPubKey)]
156        tx.nLockTime = locktime
157        if not self._address:
158            # raw script
159            if self._priv_key is not None:
160                # P2PK, need to sign
161                self.sign_tx(tx)
162            else:
163                # anyone-can-spend
164                tx.vin[0].scriptSig = CScript([OP_NOP] * 35)  # pad to identical size
165        else:
166            tx.wit.vtxinwit = [CTxInWitness()]
167            tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE])]
168        tx_hex = tx.serialize().hex()
170        tx_info = from_node.testmempoolaccept([tx_hex])[0]
171        assert_equal(mempool_valid, tx_info['allowed'])
172        if mempool_valid:
173            assert_equal(tx_info['vsize'], vsize)
174            assert_equal(tx_info['fees']['base'], fee)
175        return {'txid': tx_info['txid'], 'wtxid': tx_info['wtxid'], 'hex': tx_hex, 'tx': tx}
177    def sendrawtransaction(self, *, from_node, tx_hex):
178        from_node.sendrawtransaction(tx_hex)
179        self.scan_tx(from_node.decoderawtransaction(tx_hex))