1#!/usr/bin/env python3 2# Copyright (c) 2020 The Bitcoin Core developers 3# Distributed under the MIT software license, see the accompanying 4# file COPYING or http://www.opensource.org/licenses/mit-license.php. 5"""A limited-functionality wallet, which may replace a real wallet in tests""" 6 7from decimal import Decimal 8from enum import Enum 9from typing import Optional 10from test_framework.address import ADDRESS_BCRT1_P2WSH_OP_TRUE 11from test_framework.key import ECKey 12from test_framework.messages import ( 13 COIN, 14 COutPoint, 15 CTransaction, 16 CTxIn, 17 CTxInWitness, 18 CTxOut, 19) 20from test_framework.script import ( 21 CScript, 22 LegacySignatureHash, 23 OP_CHECKSIG, 24 OP_TRUE, 25 OP_NOP, 26 SIGHASH_ALL, 27) 28from test_framework.util import ( 29 assert_equal, 30 hex_str_to_bytes, 31 satoshi_round, 32) 33 34 35class MiniWalletMode(Enum): 36 """Determines the transaction type the MiniWallet is creating and spending. 37 38 For most purposes, the default mode ADDRESS_OP_TRUE should be sufficient; 39 it simply uses a fixed bech32 P2WSH address whose coins are spent with a 40 witness stack of OP_TRUE, i.e. following an anyone-can-spend policy. 41 However, if the transactions need to be modified by the user (e.g. prepending 42 scriptSig for testing opcodes that are activated by a soft-fork), or the txs 43 should contain an actual signature, the raw modes RAW_OP_TRUE and RAW_P2PK 44 can be useful. Summary of modes: 45 46 | output | | tx is | can modify | needs 47 mode | description | address | standard | scriptSig | signing 48 ----------------+-------------------+-----------+----------+------------+---------- 49 ADDRESS_OP_TRUE | anyone-can-spend | bech32 | yes | no | no 50 RAW_OP_TRUE | anyone-can-spend | - (raw) | no | yes | no 51 RAW_P2PK | pay-to-public-key | - (raw) | yes | yes | yes 52 """ 53 ADDRESS_OP_TRUE = 1 54 RAW_OP_TRUE = 2 55 RAW_P2PK = 3 56 57 58class MiniWallet: 59 def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE): 60 self._test_node = test_node 61 self._utxos = [] 62 self._priv_key = None 63 self._address = None 64 65 assert isinstance(mode, MiniWalletMode) 66 if mode == MiniWalletMode.RAW_OP_TRUE: 67 self._scriptPubKey = bytes(CScript([OP_TRUE])) 68 elif mode == MiniWalletMode.RAW_P2PK: 69 # use simple deterministic private key (k=1) 70 self._priv_key = ECKey() 71 self._priv_key.set((1).to_bytes(32, 'big'), True) 72 pub_key = self._priv_key.get_pubkey() 73 self._scriptPubKey = bytes(CScript([pub_key.get_bytes(), OP_CHECKSIG])) 74 elif mode == MiniWalletMode.ADDRESS_OP_TRUE: 75 self._address = ADDRESS_BCRT1_P2WSH_OP_TRUE 76 self._scriptPubKey = hex_str_to_bytes(self._test_node.validateaddress(self._address)['scriptPubKey']) 77 78 def scan_blocks(self, *, start=1, num): 79 """Scan the blocks for self._address outputs and add them to self._utxos""" 80 for i in range(start, start + num): 81 block = self._test_node.getblock(blockhash=self._test_node.getblockhash(i), verbosity=2) 82 for tx in block['tx']: 83 self.scan_tx(tx) 84 85 def scan_tx(self, tx): 86 """Scan the tx for self._scriptPubKey outputs and add them to self._utxos""" 87 for out in tx['vout']: 88 if out['scriptPubKey']['hex'] == self._scriptPubKey.hex(): 89 self._utxos.append({'txid': tx['txid'], 'vout': out['n'], 'value': out['value']}) 90 91 def sign_tx(self, tx, fixed_length=True): 92 """Sign tx that has been created by MiniWallet in P2PK mode""" 93 assert self._priv_key is not None 94 (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL) 95 assert err is None 96 # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability): 97 # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes) 98 # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes 99 der_sig = b'' 100 while not len(der_sig) == 71: 101 der_sig = self._priv_key.sign_ecdsa(sighash) 102 if not fixed_length: 103 break 104 tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))]) 105 106 def generate(self, num_blocks): 107 """Generate blocks with coinbase outputs to the internal address, and append the outputs to the internal list""" 108 blocks = self._test_node.generatetodescriptor(num_blocks, f'raw({self._scriptPubKey.hex()})') 109 for b in blocks: 110 cb_tx = self._test_node.getblock(blockhash=b, verbosity=2)['tx'][0] 111 self._utxos.append({'txid': cb_tx['txid'], 'vout': 0, 'value': cb_tx['vout'][0]['value']}) 112 return blocks 113 114 def get_address(self): 115 return self._address 116 117 def get_utxo(self, *, txid: Optional[str]='', mark_as_spent=True): 118 """ 119 Returns a utxo and marks it as spent (pops it from the internal list) 120 121 Args: 122 txid: get the first utxo we find from a specific transaction 123 124 Note: Can be used to get the change output immediately after a send_self_transfer 125 """ 126 index = -1 # by default the last utxo 127 if txid: 128 utxo = next(filter(lambda utxo: txid == utxo['txid'], self._utxos)) 129 index = self._utxos.index(utxo) 130 if mark_as_spent: 131 return self._utxos.pop(index) 132 else: 133 return self._utxos[index] 134 135 def send_self_transfer(self, **kwargs): 136 """Create and send a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed.""" 137 tx = self.create_self_transfer(**kwargs) 138 self.sendrawtransaction(from_node=kwargs['from_node'], tx_hex=tx['hex']) 139 return tx 140 141 def create_self_transfer(self, *, fee_rate=Decimal("0.003"), from_node, utxo_to_spend=None, mempool_valid=True, locktime=0, sequence=0): 142 """Create and return a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed.""" 143 self._utxos = sorted(self._utxos, key=lambda k: k['value']) 144 utxo_to_spend = utxo_to_spend or self._utxos.pop() # Pick the largest utxo (if none provided) and hope it covers the fee 145 if self._priv_key is None: 146 vsize = Decimal(96) # anyone-can-spend 147 else: 148 vsize = Decimal(168) # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other) 149 send_value = satoshi_round(utxo_to_spend['value'] - fee_rate * (vsize / 1000)) 150 fee = utxo_to_spend['value'] - send_value 151 assert send_value > 0 152 153 tx = CTransaction() 154 tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=sequence)] 155 tx.vout = [CTxOut(int(send_value * COIN), self._scriptPubKey)] 156 tx.nLockTime = locktime 157 if not self._address: 158 # raw script 159 if self._priv_key is not None: 160 # P2PK, need to sign 161 self.sign_tx(tx) 162 else: 163 # anyone-can-spend 164 tx.vin[0].scriptSig = CScript([OP_NOP] * 35) # pad to identical size 165 else: 166 tx.wit.vtxinwit = [CTxInWitness()] 167 tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE])] 168 tx_hex = tx.serialize().hex() 169 170 tx_info = from_node.testmempoolaccept([tx_hex])[0] 171 assert_equal(mempool_valid, tx_info['allowed']) 172 if mempool_valid: 173 assert_equal(tx_info['vsize'], vsize) 174 assert_equal(tx_info['fees']['base'], fee) 175 return {'txid': tx_info['txid'], 'wtxid': tx_info['wtxid'], 'hex': tx_hex, 'tx': tx} 176 177 def sendrawtransaction(self, *, from_node, tx_hex): 178 from_node.sendrawtransaction(tx_hex) 179 self.scan_tx(from_node.decoderawtransaction(tx_hex)) 180