1#!/usr/local/bin/python3.8 2# 3# Electrum - lightweight Bitcoin client 4# Copyright (C) 2011 Thomas Voegtlin 5# 6# Permission is hereby granted, free of charge, to any person 7# obtaining a copy of this software and associated documentation files 8# (the "Software"), to deal in the Software without restriction, 9# including without limitation the rights to use, copy, modify, merge, 10# publish, distribute, sublicense, and/or sell copies of the Software, 11# and to permit persons to whom the Software is furnished to do so, 12# subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be 15# included in all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24# SOFTWARE. 25 26 27 28# Note: The deserialization code originally comes from ABE. 29 30import struct 31import traceback 32import sys 33import io 34import base64 35from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable, 36 Callable, List, Dict, Set, TYPE_CHECKING) 37from collections import defaultdict 38from enum import IntEnum 39import itertools 40import binascii 41import copy 42 43from . import ecc, bitcoin, constants, segwit_addr, bip32 44from .bip32 import BIP32Node 45from .util import profiler, to_bytes, bh2u, bfh, chunks, is_hex_str 46from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160, 47 hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr, 48 var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN, 49 int_to_hex, push_script, b58_address_to_hash160, 50 opcodes, add_number_to_script, base_decode, is_segwit_script_type, 51 base_encode, construct_witness, construct_script) 52from .crypto import sha256d 53from .logging import get_logger 54 55if TYPE_CHECKING: 56 from .wallet import Abstract_Wallet 57 58 59_logger = get_logger(__name__) 60DEBUG_PSBT_PARSING = False 61 62 63class SerializationError(Exception): 64 """ Thrown when there's a problem deserializing or serializing """ 65 66 67class UnknownTxinType(Exception): 68 pass 69 70 71class BadHeaderMagic(SerializationError): 72 pass 73 74 75class UnexpectedEndOfStream(SerializationError): 76 pass 77 78 79class PSBTInputConsistencyFailure(SerializationError): 80 pass 81 82 83class MalformedBitcoinScript(Exception): 84 pass 85 86 87class MissingTxInputAmount(Exception): 88 pass 89 90 91SIGHASH_ALL = 1 92 93 94class TxOutput: 95 scriptpubkey: bytes 96 value: Union[int, str] 97 98 def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]): 99 self.scriptpubkey = scriptpubkey 100 self.value = value # str when the output is set to max: '!' # in satoshis 101 102 @classmethod 103 def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']: 104 return cls(scriptpubkey=bfh(bitcoin.address_to_script(address)), 105 value=value) 106 107 def serialize_to_network(self) -> bytes: 108 buf = int.to_bytes(self.value, 8, byteorder="little", signed=False) 109 script = self.scriptpubkey 110 buf += bfh(var_int(len(script.hex()) // 2)) 111 buf += script 112 return buf 113 114 @classmethod 115 def from_network_bytes(cls, raw: bytes) -> 'TxOutput': 116 vds = BCDataStream() 117 vds.write(raw) 118 txout = parse_output(vds) 119 if vds.can_read_more(): 120 raise SerializationError('extra junk at the end of TxOutput bytes') 121 return txout 122 123 def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]: 124 if self.address: 125 return TYPE_ADDRESS, self.address, self.value 126 return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value 127 128 @classmethod 129 def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']: 130 if _type == TYPE_ADDRESS: 131 return cls.from_address_and_value(addr, val) 132 if _type == TYPE_SCRIPT: 133 return cls(scriptpubkey=bfh(addr), value=val) 134 raise Exception(f"unexptected legacy address type: {_type}") 135 136 @property 137 def address(self) -> Optional[str]: 138 return get_address_from_output_script(self.scriptpubkey) # TODO cache this? 139 140 def get_ui_address_str(self) -> str: 141 addr = self.address 142 if addr is not None: 143 return addr 144 return f"SCRIPT {self.scriptpubkey.hex()}" 145 146 def __repr__(self): 147 return f"<TxOutput script={self.scriptpubkey.hex()} address={self.address} value={self.value}>" 148 149 def __eq__(self, other): 150 if not isinstance(other, TxOutput): 151 return False 152 return self.scriptpubkey == other.scriptpubkey and self.value == other.value 153 154 def __ne__(self, other): 155 return not (self == other) 156 157 def to_json(self): 158 d = { 159 'scriptpubkey': self.scriptpubkey.hex(), 160 'address': self.address, 161 'value_sats': self.value, 162 } 163 return d 164 165 166class BIP143SharedTxDigestFields(NamedTuple): 167 hashPrevouts: str 168 hashSequence: str 169 hashOutputs: str 170 171 172class TxOutpoint(NamedTuple): 173 txid: bytes # endianness same as hex string displayed; reverse of tx serialization order 174 out_idx: int 175 176 @classmethod 177 def from_str(cls, s: str) -> 'TxOutpoint': 178 hash_str, idx_str = s.split(':') 179 assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash" 180 return TxOutpoint(txid=bfh(hash_str), 181 out_idx=int(idx_str)) 182 183 def to_str(self) -> str: 184 return f"{self.txid.hex()}:{self.out_idx}" 185 186 def to_json(self): 187 return [self.txid.hex(), self.out_idx] 188 189 def serialize_to_network(self) -> bytes: 190 return self.txid[::-1] + bfh(int_to_hex(self.out_idx, 4)) 191 192 def is_coinbase(self) -> bool: 193 return self.txid == bytes(32) 194 195 196class TxInput: 197 prevout: TxOutpoint 198 script_sig: Optional[bytes] 199 nsequence: int 200 witness: Optional[bytes] 201 _is_coinbase_output: bool 202 203 def __init__(self, *, 204 prevout: TxOutpoint, 205 script_sig: bytes = None, 206 nsequence: int = 0xffffffff - 1, 207 witness: bytes = None, 208 is_coinbase_output: bool = False): 209 self.prevout = prevout 210 self.script_sig = script_sig 211 self.nsequence = nsequence 212 self.witness = witness 213 self._is_coinbase_output = is_coinbase_output 214 215 def is_coinbase_input(self) -> bool: 216 """Whether this is the input of a coinbase tx.""" 217 return self.prevout.is_coinbase() 218 219 def is_coinbase_output(self) -> bool: 220 """Whether the coin being spent is an output of a coinbase tx. 221 This matters for coin maturity. 222 """ 223 return self._is_coinbase_output 224 225 def value_sats(self) -> Optional[int]: 226 return None 227 228 def to_json(self): 229 d = { 230 'prevout_hash': self.prevout.txid.hex(), 231 'prevout_n': self.prevout.out_idx, 232 'coinbase': self.is_coinbase_output(), 233 'nsequence': self.nsequence, 234 } 235 if self.script_sig is not None: 236 d['scriptSig'] = self.script_sig.hex() 237 if self.witness is not None: 238 d['witness'] = self.witness.hex() 239 return d 240 241 def witness_elements(self)-> Sequence[bytes]: 242 vds = BCDataStream() 243 vds.write(self.witness) 244 n = vds.read_compact_size() 245 return list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) 246 247 def is_segwit(self, *, guess_for_address=False) -> bool: 248 if self.witness not in (b'\x00', b'', None): 249 return True 250 return False 251 252 253class BCDataStream(object): 254 """Workalike python implementation of Bitcoin's CDataStream class.""" 255 256 def __init__(self): 257 self.input = None # type: Optional[bytearray] 258 self.read_cursor = 0 259 260 def clear(self): 261 self.input = None 262 self.read_cursor = 0 263 264 def write(self, _bytes: Union[bytes, bytearray]): # Initialize with string of _bytes 265 assert isinstance(_bytes, (bytes, bytearray)) 266 if self.input is None: 267 self.input = bytearray(_bytes) 268 else: 269 self.input += bytearray(_bytes) 270 271 def read_string(self, encoding='ascii'): 272 # Strings are encoded depending on length: 273 # 0 to 252 : 1-byte-length followed by bytes (if any) 274 # 253 to 65,535 : byte'253' 2-byte-length followed by bytes 275 # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes 276 # ... and the Bitcoin client is coded to understand: 277 # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string 278 # ... but I don't think it actually handles any strings that big. 279 if self.input is None: 280 raise SerializationError("call write(bytes) before trying to deserialize") 281 282 length = self.read_compact_size() 283 284 return self.read_bytes(length).decode(encoding) 285 286 def write_string(self, string, encoding='ascii'): 287 string = to_bytes(string, encoding) 288 # Length-encoded as with read-string 289 self.write_compact_size(len(string)) 290 self.write(string) 291 292 def read_bytes(self, length: int) -> bytes: 293 if self.input is None: 294 raise SerializationError("call write(bytes) before trying to deserialize") 295 assert length >= 0 296 input_len = len(self.input) 297 read_begin = self.read_cursor 298 read_end = read_begin + length 299 if 0 <= read_begin <= read_end <= input_len: 300 result = self.input[read_begin:read_end] # type: bytearray 301 self.read_cursor += length 302 return bytes(result) 303 else: 304 raise SerializationError('attempt to read past end of buffer') 305 306 def write_bytes(self, _bytes: Union[bytes, bytearray], length: int): 307 assert len(_bytes) == length, len(_bytes) 308 self.write(_bytes) 309 310 def can_read_more(self) -> bool: 311 if not self.input: 312 return False 313 return self.read_cursor < len(self.input) 314 315 def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00' 316 def read_int16(self): return self._read_num('<h') 317 def read_uint16(self): return self._read_num('<H') 318 def read_int32(self): return self._read_num('<i') 319 def read_uint32(self): return self._read_num('<I') 320 def read_int64(self): return self._read_num('<q') 321 def read_uint64(self): return self._read_num('<Q') 322 323 def write_boolean(self, val): return self.write(b'\x01' if val else b'\x00') 324 def write_int16(self, val): return self._write_num('<h', val) 325 def write_uint16(self, val): return self._write_num('<H', val) 326 def write_int32(self, val): return self._write_num('<i', val) 327 def write_uint32(self, val): return self._write_num('<I', val) 328 def write_int64(self, val): return self._write_num('<q', val) 329 def write_uint64(self, val): return self._write_num('<Q', val) 330 331 def read_compact_size(self): 332 try: 333 size = self.input[self.read_cursor] 334 self.read_cursor += 1 335 if size == 253: 336 size = self._read_num('<H') 337 elif size == 254: 338 size = self._read_num('<I') 339 elif size == 255: 340 size = self._read_num('<Q') 341 return size 342 except IndexError as e: 343 raise SerializationError("attempt to read past end of buffer") from e 344 345 def write_compact_size(self, size): 346 if size < 0: 347 raise SerializationError("attempt to write size < 0") 348 elif size < 253: 349 self.write(bytes([size])) 350 elif size < 2**16: 351 self.write(b'\xfd') 352 self._write_num('<H', size) 353 elif size < 2**32: 354 self.write(b'\xfe') 355 self._write_num('<I', size) 356 elif size < 2**64: 357 self.write(b'\xff') 358 self._write_num('<Q', size) 359 else: 360 raise Exception(f"size {size} too large for compact_size") 361 362 def _read_num(self, format): 363 try: 364 (i,) = struct.unpack_from(format, self.input, self.read_cursor) 365 self.read_cursor += struct.calcsize(format) 366 except Exception as e: 367 raise SerializationError(e) from e 368 return i 369 370 def _write_num(self, format, num): 371 s = struct.pack(format, num) 372 self.write(s) 373 374 375def script_GetOp(_bytes : bytes): 376 i = 0 377 while i < len(_bytes): 378 vch = None 379 opcode = _bytes[i] 380 i += 1 381 382 if opcode <= opcodes.OP_PUSHDATA4: 383 nSize = opcode 384 if opcode == opcodes.OP_PUSHDATA1: 385 try: nSize = _bytes[i] 386 except IndexError: raise MalformedBitcoinScript() 387 i += 1 388 elif opcode == opcodes.OP_PUSHDATA2: 389 try: (nSize,) = struct.unpack_from('<H', _bytes, i) 390 except struct.error: raise MalformedBitcoinScript() 391 i += 2 392 elif opcode == opcodes.OP_PUSHDATA4: 393 try: (nSize,) = struct.unpack_from('<I', _bytes, i) 394 except struct.error: raise MalformedBitcoinScript() 395 i += 4 396 vch = _bytes[i:i + nSize] 397 i += nSize 398 399 yield opcode, vch, i 400 401 402class OPPushDataGeneric: 403 def __init__(self, pushlen: Callable=None): 404 if pushlen is not None: 405 self.check_data_len = pushlen 406 407 @classmethod 408 def check_data_len(cls, datalen: int) -> bool: 409 # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. 410 return opcodes.OP_PUSHDATA4 >= datalen >= 0 411 412 @classmethod 413 def is_instance(cls, item): 414 # accept objects that are instances of this class 415 # or other classes that are subclasses 416 return isinstance(item, cls) \ 417 or (isinstance(item, type) and issubclass(item, cls)) 418 419 420OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65)) 421 422SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160, 423 OPPushDataGeneric(lambda x: x == 20), 424 opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG] 425SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL] 426SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))] 427SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)] 428SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)] 429 430 431def match_script_against_template(script, template) -> bool: 432 """Returns whether 'script' matches 'template'.""" 433 if script is None: 434 return False 435 # optionally decode script now: 436 if isinstance(script, (bytes, bytearray)): 437 try: 438 script = [x for x in script_GetOp(script)] 439 except MalformedBitcoinScript: 440 return False 441 if len(script) != len(template): 442 return False 443 for i in range(len(script)): 444 template_item = template[i] 445 script_item = script[i] 446 if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]): 447 continue 448 if template_item != script_item[0]: 449 return False 450 return True 451 452def get_script_type_from_output_script(_bytes: bytes) -> Optional[str]: 453 if _bytes is None: 454 return None 455 try: 456 decoded = [x for x in script_GetOp(_bytes)] 457 except MalformedBitcoinScript: 458 return None 459 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH): 460 return 'p2pkh' 461 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH): 462 return 'p2sh' 463 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH): 464 return 'p2wpkh' 465 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH): 466 return 'p2wsh' 467 return None 468 469def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]: 470 try: 471 decoded = [x for x in script_GetOp(_bytes)] 472 except MalformedBitcoinScript: 473 return None 474 475 # p2pkh 476 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH): 477 return hash160_to_p2pkh(decoded[2][1], net=net) 478 479 # p2sh 480 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH): 481 return hash160_to_p2sh(decoded[1][1], net=net) 482 483 # segwit address (version 0) 484 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0): 485 return hash_to_segwit_addr(decoded[1][1], witver=0, net=net) 486 487 # segwit address (version 1-16) 488 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1)) 489 for witver, opcode in enumerate(future_witness_versions, start=1): 490 match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)] 491 if match_script_against_template(decoded, match): 492 return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net) 493 494 return None 495 496 497def parse_input(vds: BCDataStream) -> TxInput: 498 prevout_hash = vds.read_bytes(32)[::-1] 499 prevout_n = vds.read_uint32() 500 prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n) 501 script_sig = vds.read_bytes(vds.read_compact_size()) 502 nsequence = vds.read_uint32() 503 return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence) 504 505 506def parse_witness(vds: BCDataStream, txin: TxInput) -> None: 507 n = vds.read_compact_size() 508 witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) 509 txin.witness = bfh(construct_witness(witness_elements)) 510 511 512def parse_output(vds: BCDataStream) -> TxOutput: 513 value = vds.read_int64() 514 if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN: 515 raise SerializationError('invalid output amount (too large)') 516 if value < 0: 517 raise SerializationError('invalid output amount (negative)') 518 scriptpubkey = vds.read_bytes(vds.read_compact_size()) 519 return TxOutput(value=value, scriptpubkey=scriptpubkey) 520 521 522# pay & redeem scripts 523 524def multisig_script(public_keys: Sequence[str], m: int) -> str: 525 n = len(public_keys) 526 assert 1 <= m <= n <= 15, f'm {m}, n {n}' 527 return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG]) 528 529 530 531 532class Transaction: 533 _cached_network_ser: Optional[str] 534 535 def __str__(self): 536 return self.serialize() 537 538 def __init__(self, raw): 539 if raw is None: 540 self._cached_network_ser = None 541 elif isinstance(raw, str): 542 self._cached_network_ser = raw.strip() if raw else None 543 assert is_hex_str(self._cached_network_ser) 544 elif isinstance(raw, (bytes, bytearray)): 545 self._cached_network_ser = bh2u(raw) 546 else: 547 raise Exception(f"cannot initialize transaction from {raw}") 548 self._inputs = None # type: List[TxInput] 549 self._outputs = None # type: List[TxOutput] 550 self._locktime = 0 551 self._version = 2 552 553 self._cached_txid = None # type: Optional[str] 554 555 @property 556 def locktime(self): 557 self.deserialize() 558 return self._locktime 559 560 @locktime.setter 561 def locktime(self, value: int): 562 assert isinstance(value, int), f"locktime must be int, not {value!r}" 563 self._locktime = value 564 self.invalidate_ser_cache() 565 566 @property 567 def version(self): 568 self.deserialize() 569 return self._version 570 571 @version.setter 572 def version(self, value): 573 self._version = value 574 self.invalidate_ser_cache() 575 576 def to_json(self) -> dict: 577 d = { 578 'version': self.version, 579 'locktime': self.locktime, 580 'inputs': [txin.to_json() for txin in self.inputs()], 581 'outputs': [txout.to_json() for txout in self.outputs()], 582 } 583 return d 584 585 def inputs(self) -> Sequence[TxInput]: 586 if self._inputs is None: 587 self.deserialize() 588 return self._inputs 589 590 def outputs(self) -> Sequence[TxOutput]: 591 if self._outputs is None: 592 self.deserialize() 593 return self._outputs 594 595 def deserialize(self) -> None: 596 if self._cached_network_ser is None: 597 return 598 if self._inputs is not None: 599 return 600 601 raw_bytes = bfh(self._cached_network_ser) 602 vds = BCDataStream() 603 vds.write(raw_bytes) 604 self._version = vds.read_int32() 605 n_vin = vds.read_compact_size() 606 is_segwit = (n_vin == 0) 607 if is_segwit: 608 marker = vds.read_bytes(1) 609 if marker != b'\x01': 610 raise ValueError('invalid txn marker byte: {}'.format(marker)) 611 n_vin = vds.read_compact_size() 612 if n_vin < 1: 613 raise SerializationError('tx needs to have at least 1 input') 614 self._inputs = [parse_input(vds) for i in range(n_vin)] 615 n_vout = vds.read_compact_size() 616 if n_vout < 1: 617 raise SerializationError('tx needs to have at least 1 output') 618 self._outputs = [parse_output(vds) for i in range(n_vout)] 619 if is_segwit: 620 for txin in self._inputs: 621 parse_witness(vds, txin) 622 self._locktime = vds.read_uint32() 623 if vds.can_read_more(): 624 raise SerializationError('extra junk at the end') 625 626 @classmethod 627 def get_siglist(self, txin: 'PartialTxInput', *, estimate_size=False): 628 if txin.is_coinbase_input(): 629 return [], [] 630 631 if estimate_size: 632 try: 633 pubkey_size = len(txin.pubkeys[0]) 634 except IndexError: 635 pubkey_size = 33 # guess it is compressed 636 num_pubkeys = max(1, len(txin.pubkeys)) 637 pk_list = ["00" * pubkey_size] * num_pubkeys 638 num_sig = max(1, txin.num_sig) 639 # we guess that signatures will be 72 bytes long 640 # note: DER-encoded ECDSA signatures are 71 or 72 bytes in practice 641 # See https://bitcoin.stackexchange.com/questions/77191/what-is-the-maximum-size-of-a-der-encoded-ecdsa-signature 642 # We assume low S (as that is a bitcoin standardness rule). 643 # We do not assume low R (even though the sigs we create conform), as external sigs, 644 # e.g. from a hw signer cannot be expected to have a low R. 645 sig_list = ["00" * 72] * num_sig 646 else: 647 pk_list = [pubkey.hex() for pubkey in txin.pubkeys] 648 sig_list = [txin.part_sigs.get(pubkey, b'').hex() for pubkey in txin.pubkeys] 649 if txin.is_complete(): 650 sig_list = [sig for sig in sig_list if sig] 651 return pk_list, sig_list 652 653 @classmethod 654 def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> str: 655 if txin.witness is not None: 656 return txin.witness.hex() 657 if txin.is_coinbase_input(): 658 return '' 659 assert isinstance(txin, PartialTxInput) 660 661 _type = txin.script_type 662 if not txin.is_segwit(): 663 return construct_witness([]) 664 665 if _type in ('address', 'unknown') and estimate_size: 666 _type = cls.guess_txintype_from_address(txin.address) 667 pubkeys, sig_list = cls.get_siglist(txin, estimate_size=estimate_size) 668 if _type in ['p2wpkh', 'p2wpkh-p2sh']: 669 return construct_witness([sig_list[0], pubkeys[0]]) 670 elif _type in ['p2wsh', 'p2wsh-p2sh']: 671 witness_script = multisig_script(pubkeys, txin.num_sig) 672 return construct_witness([0, *sig_list, witness_script]) 673 elif _type in ['p2pk', 'p2pkh', 'p2sh']: 674 return construct_witness([]) 675 raise UnknownTxinType(f'cannot construct witness for txin_type: {_type}') 676 677 @classmethod 678 def guess_txintype_from_address(cls, addr: Optional[str]) -> str: 679 # It's not possible to tell the script type in general 680 # just from an address. 681 # - "1" addresses are of course p2pkh 682 # - "3" addresses are p2sh but we don't know the redeem script.. 683 # - "bc1" addresses (if they are 42-long) are p2wpkh 684 # - "bc1" addresses that are 62-long are p2wsh but we don't know the script.. 685 # If we don't know the script, we _guess_ it is pubkeyhash. 686 # As this method is used e.g. for tx size estimation, 687 # the estimation will not be precise. 688 if addr is None: 689 return 'p2wpkh' 690 witver, witprog = segwit_addr.decode_segwit_address(constants.net.SEGWIT_HRP, addr) 691 if witprog is not None: 692 return 'p2wpkh' 693 addrtype, hash_160_ = b58_address_to_hash160(addr) 694 if addrtype == constants.net.ADDRTYPE_P2PKH: 695 return 'p2pkh' 696 elif addrtype == constants.net.ADDRTYPE_P2SH: 697 return 'p2wpkh-p2sh' 698 raise Exception(f'unrecognized address: {repr(addr)}') 699 700 @classmethod 701 def input_script(self, txin: TxInput, *, estimate_size=False) -> str: 702 if txin.script_sig is not None: 703 return txin.script_sig.hex() 704 if txin.is_coinbase_input(): 705 return '' 706 assert isinstance(txin, PartialTxInput) 707 708 if txin.is_p2sh_segwit() and txin.redeem_script: 709 return construct_script([txin.redeem_script]) 710 if txin.is_native_segwit(): 711 return '' 712 713 _type = txin.script_type 714 pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size) 715 if _type in ('address', 'unknown') and estimate_size: 716 _type = self.guess_txintype_from_address(txin.address) 717 if _type == 'p2pk': 718 return construct_script([sig_list[0]]) 719 elif _type == 'p2sh': 720 # put op_0 before script 721 redeem_script = multisig_script(pubkeys, txin.num_sig) 722 return construct_script([0, *sig_list, redeem_script]) 723 elif _type == 'p2pkh': 724 return construct_script([sig_list[0], pubkeys[0]]) 725 elif _type in ['p2wpkh', 'p2wsh']: 726 return '' 727 elif _type == 'p2wpkh-p2sh': 728 redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0]) 729 return construct_script([redeem_script]) 730 elif _type == 'p2wsh-p2sh': 731 if estimate_size: 732 witness_script = '' 733 else: 734 witness_script = self.get_preimage_script(txin) 735 redeem_script = bitcoin.p2wsh_nested_script(witness_script) 736 return construct_script([redeem_script]) 737 raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}') 738 739 @classmethod 740 def get_preimage_script(cls, txin: 'PartialTxInput') -> str: 741 if txin.witness_script: 742 if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]: 743 raise Exception('OP_CODESEPARATOR black magic is not supported') 744 return txin.witness_script.hex() 745 if not txin.is_segwit() and txin.redeem_script: 746 if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]: 747 raise Exception('OP_CODESEPARATOR black magic is not supported') 748 return txin.redeem_script.hex() 749 750 pubkeys = [pk.hex() for pk in txin.pubkeys] 751 if txin.script_type in ['p2sh', 'p2wsh', 'p2wsh-p2sh']: 752 return multisig_script(pubkeys, txin.num_sig) 753 elif txin.script_type in ['p2pkh', 'p2wpkh', 'p2wpkh-p2sh']: 754 pubkey = pubkeys[0] 755 pkh = bh2u(hash_160(bfh(pubkey))) 756 return bitcoin.pubkeyhash_to_p2pkh_script(pkh) 757 elif txin.script_type == 'p2pk': 758 pubkey = pubkeys[0] 759 return bitcoin.public_key_to_p2pk_script(pubkey) 760 else: 761 raise UnknownTxinType(f'cannot construct preimage_script for txin_type: {txin.script_type}') 762 763 @classmethod 764 def serialize_input(self, txin: TxInput, script: str) -> str: 765 # Prev hash and index 766 s = txin.prevout.serialize_to_network().hex() 767 # Script length, script, sequence 768 s += var_int(len(script)//2) 769 s += script 770 s += int_to_hex(txin.nsequence, 4) 771 return s 772 773 def _calc_bip143_shared_txdigest_fields(self) -> BIP143SharedTxDigestFields: 774 inputs = self.inputs() 775 outputs = self.outputs() 776 hashPrevouts = bh2u(sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs))) 777 hashSequence = bh2u(sha256d(bfh(''.join(int_to_hex(txin.nsequence, 4) for txin in inputs)))) 778 hashOutputs = bh2u(sha256d(bfh(''.join(o.serialize_to_network().hex() for o in outputs)))) 779 return BIP143SharedTxDigestFields(hashPrevouts=hashPrevouts, 780 hashSequence=hashSequence, 781 hashOutputs=hashOutputs) 782 783 def is_segwit(self, *, guess_for_address=False): 784 return any(txin.is_segwit(guess_for_address=guess_for_address) 785 for txin in self.inputs()) 786 787 def invalidate_ser_cache(self): 788 self._cached_network_ser = None 789 self._cached_txid = None 790 791 def serialize(self) -> str: 792 if not self._cached_network_ser: 793 self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True) 794 return self._cached_network_ser 795 796 def serialize_as_bytes(self) -> bytes: 797 return bfh(self.serialize()) 798 799 def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str: 800 """Serialize the transaction as used on the Bitcoin network, into hex. 801 `include_sigs` signals whether to include scriptSigs and witnesses. 802 `force_legacy` signals to use the pre-segwit format 803 note: (not include_sigs) implies force_legacy 804 """ 805 self.deserialize() 806 nVersion = int_to_hex(self.version, 4) 807 nLocktime = int_to_hex(self.locktime, 4) 808 inputs = self.inputs() 809 outputs = self.outputs() 810 811 def create_script_sig(txin: TxInput) -> str: 812 if include_sigs: 813 return self.input_script(txin, estimate_size=estimate_size) 814 return '' 815 txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, create_script_sig(txin)) 816 for txin in inputs) 817 txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs) 818 819 use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True) 820 use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit() 821 use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use 822 if include_sigs and not force_legacy and use_segwit_ser: 823 marker = '00' 824 flag = '01' 825 witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size) for x in inputs) 826 return nVersion + marker + flag + txins + txouts + witness + nLocktime 827 else: 828 return nVersion + txins + txouts + nLocktime 829 830 def to_qr_data(self) -> str: 831 """Returns tx as data to be put into a QR code. No side-effects.""" 832 tx = copy.deepcopy(self) # make copy as we mutate tx 833 if isinstance(tx, PartialTransaction): 834 # this makes QR codes a lot smaller (or just possible in the first place!) 835 tx.convert_all_utxos_to_witness_utxos() 836 tx_bytes = tx.serialize_as_bytes() 837 return base_encode(tx_bytes, base=43) 838 839 def txid(self) -> Optional[str]: 840 if self._cached_txid is None: 841 self.deserialize() 842 all_segwit = all(txin.is_segwit() for txin in self.inputs()) 843 if not all_segwit and not self.is_complete(): 844 return None 845 try: 846 ser = self.serialize_to_network(force_legacy=True) 847 except UnknownTxinType: 848 # we might not know how to construct scriptSig for some scripts 849 return None 850 self._cached_txid = bh2u(sha256d(bfh(ser))[::-1]) 851 return self._cached_txid 852 853 def wtxid(self) -> Optional[str]: 854 self.deserialize() 855 if not self.is_complete(): 856 return None 857 try: 858 ser = self.serialize_to_network() 859 except UnknownTxinType: 860 # we might not know how to construct scriptSig/witness for some scripts 861 return None 862 return bh2u(sha256d(bfh(ser))[::-1]) 863 864 def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None: 865 return # no-op 866 867 def is_final(self) -> bool: 868 """Whether RBF is disabled.""" 869 return not any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()]) 870 871 def estimated_size(self): 872 """Return an estimated virtual tx size in vbytes. 873 BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up. 874 This definition is only for humans, and has little meaning otherwise. 875 If we wanted sub-byte precision, fee calculation should use transaction 876 weights, but for simplicity we approximate that with (virtual_size)x4 877 """ 878 weight = self.estimated_weight() 879 return self.virtual_size_from_weight(weight) 880 881 @classmethod 882 def estimated_input_weight(cls, txin, is_segwit_tx): 883 '''Return an estimate of serialized input weight in weight units.''' 884 script = cls.input_script(txin, estimate_size=True) 885 input_size = len(cls.serialize_input(txin, script)) // 2 886 887 if txin.is_segwit(guess_for_address=True): 888 witness_size = len(cls.serialize_witness(txin, estimate_size=True)) // 2 889 else: 890 witness_size = 1 if is_segwit_tx else 0 891 892 return 4 * input_size + witness_size 893 894 @classmethod 895 def estimated_output_size_for_address(cls, address: str) -> int: 896 """Return an estimate of serialized output size in bytes.""" 897 script = bitcoin.address_to_script(address) 898 return cls.estimated_output_size_for_script(script) 899 900 @classmethod 901 def estimated_output_size_for_script(cls, script: str) -> int: 902 """Return an estimate of serialized output size in bytes.""" 903 # 8 byte value + varint script len + script 904 script_len = len(script) // 2 905 var_int_len = len(var_int(script_len)) // 2 906 return 8 + var_int_len + script_len 907 908 @classmethod 909 def virtual_size_from_weight(cls, weight): 910 return weight // 4 + (weight % 4 > 0) 911 912 @classmethod 913 def satperbyte_from_satperkw(cls, feerate_kw): 914 """Converts feerate from sat/kw to sat/vbyte.""" 915 return feerate_kw * 4 / 1000 916 917 def estimated_total_size(self): 918 """Return an estimated total transaction size in bytes.""" 919 if not self.is_complete() or self._cached_network_ser is None: 920 return len(self.serialize_to_network(estimate_size=True)) // 2 921 else: 922 return len(self._cached_network_ser) // 2 # ASCII hex string 923 924 def estimated_witness_size(self): 925 """Return an estimate of witness size in bytes.""" 926 estimate = not self.is_complete() 927 if not self.is_segwit(guess_for_address=estimate): 928 return 0 929 inputs = self.inputs() 930 witness = ''.join(self.serialize_witness(x, estimate_size=estimate) for x in inputs) 931 witness_size = len(witness) // 2 + 2 # include marker and flag 932 return witness_size 933 934 def estimated_base_size(self): 935 """Return an estimated base transaction size in bytes.""" 936 return self.estimated_total_size() - self.estimated_witness_size() 937 938 def estimated_weight(self): 939 """Return an estimate of transaction weight.""" 940 total_tx_size = self.estimated_total_size() 941 base_tx_size = self.estimated_base_size() 942 return 3 * base_tx_size + total_tx_size 943 944 def is_complete(self) -> bool: 945 return True 946 947 def get_output_idxs_from_scriptpubkey(self, script: str) -> Set[int]: 948 """Returns the set indices of outputs with given script.""" 949 assert isinstance(script, str) # hex 950 # build cache if there isn't one yet 951 # note: can become stale and return incorrect data 952 # if the tx is modified later; that's out of scope. 953 if not hasattr(self, '_script_to_output_idx'): 954 d = defaultdict(set) 955 for output_idx, o in enumerate(self.outputs()): 956 o_script = o.scriptpubkey.hex() 957 assert isinstance(o_script, str) 958 d[o_script].add(output_idx) 959 self._script_to_output_idx = d 960 return set(self._script_to_output_idx[script]) # copy 961 962 def get_output_idxs_from_address(self, addr: str) -> Set[int]: 963 script = bitcoin.address_to_script(addr) 964 return self.get_output_idxs_from_scriptpubkey(script) 965 966 def output_value_for_address(self, addr): 967 # assumes exactly one output has that address 968 for o in self.outputs(): 969 if o.address == addr: 970 return o.value 971 else: 972 raise Exception('output not found', addr) 973 974 def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]: 975 # build cache if there isn't one yet 976 # note: can become stale and return incorrect data 977 # if the tx is modified later; that's out of scope. 978 if not hasattr(self, '_prevout_to_input_idx'): 979 d = {} # type: Dict[TxOutpoint, int] 980 for i, txin in enumerate(self.inputs()): 981 d[txin.prevout] = i 982 self._prevout_to_input_idx = d 983 idx = self._prevout_to_input_idx.get(prevout) 984 if idx is not None: 985 assert self.inputs()[idx].prevout == prevout 986 return idx 987 988 989def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str: 990 """Sanitizes tx-describing input (hex/base43/base64) into 991 raw tx hex string.""" 992 if not raw: 993 raise ValueError("empty string") 994 raw_unstripped = raw 995 raw = raw.strip() 996 # try hex 997 try: 998 return binascii.unhexlify(raw).hex() 999 except: 1000 pass 1001 # try base43 1002 try: 1003 return base_decode(raw, base=43).hex() 1004 except: 1005 pass 1006 # try base64 1007 if raw[0:6] in ('cHNidP', b'cHNidP'): # base64 psbt 1008 try: 1009 return base64.b64decode(raw).hex() 1010 except: 1011 pass 1012 # raw bytes (do not strip whitespaces in this case) 1013 if isinstance(raw_unstripped, bytes): 1014 return raw_unstripped.hex() 1015 raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...") 1016 1017 1018def tx_from_any(raw: Union[str, bytes], *, 1019 deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']: 1020 if isinstance(raw, bytearray): 1021 raw = bytes(raw) 1022 raw = convert_raw_tx_to_hex(raw) 1023 try: 1024 return PartialTransaction.from_raw_psbt(raw) 1025 except BadHeaderMagic: 1026 if raw[:10] == b'EPTF\xff'.hex(): 1027 raise SerializationError("Partial transactions generated with old Electrum versions " 1028 "(< 4.0) are no longer supported. Please upgrade Electrum on " 1029 "the other machine where this transaction was created.") 1030 try: 1031 tx = Transaction(raw) 1032 if deserialize: 1033 tx.deserialize() 1034 return tx 1035 except Exception as e: 1036 raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. " 1037 f"raw: {raw[:30]}...") from e 1038 1039 1040class PSBTGlobalType(IntEnum): 1041 UNSIGNED_TX = 0 1042 XPUB = 1 1043 VERSION = 0xFB 1044 1045 1046class PSBTInputType(IntEnum): 1047 NON_WITNESS_UTXO = 0 1048 WITNESS_UTXO = 1 1049 PARTIAL_SIG = 2 1050 SIGHASH_TYPE = 3 1051 REDEEM_SCRIPT = 4 1052 WITNESS_SCRIPT = 5 1053 BIP32_DERIVATION = 6 1054 FINAL_SCRIPTSIG = 7 1055 FINAL_SCRIPTWITNESS = 8 1056 1057 1058class PSBTOutputType(IntEnum): 1059 REDEEM_SCRIPT = 0 1060 WITNESS_SCRIPT = 1 1061 BIP32_DERIVATION = 2 1062 1063 1064# Serialization/deserialization tools 1065def deser_compact_size(f) -> Optional[int]: 1066 try: 1067 nit = f.read(1)[0] 1068 except IndexError: 1069 return None # end of file 1070 1071 if nit == 253: 1072 nit = struct.unpack("<H", f.read(2))[0] 1073 elif nit == 254: 1074 nit = struct.unpack("<I", f.read(4))[0] 1075 elif nit == 255: 1076 nit = struct.unpack("<Q", f.read(8))[0] 1077 return nit 1078 1079 1080class PSBTSection: 1081 1082 def _populate_psbt_fields_from_fd(self, fd=None): 1083 if not fd: return 1084 1085 while True: 1086 try: 1087 key_type, key, val = self.get_next_kv_from_fd(fd) 1088 except StopIteration: 1089 break 1090 self.parse_psbt_section_kv(key_type, key, val) 1091 1092 @classmethod 1093 def get_next_kv_from_fd(cls, fd) -> Tuple[int, bytes, bytes]: 1094 key_size = deser_compact_size(fd) 1095 if key_size == 0: 1096 raise StopIteration() 1097 if key_size is None: 1098 raise UnexpectedEndOfStream() 1099 1100 full_key = fd.read(key_size) 1101 key_type, key = cls.get_keytype_and_key_from_fullkey(full_key) 1102 1103 val_size = deser_compact_size(fd) 1104 if val_size is None: raise UnexpectedEndOfStream() 1105 val = fd.read(val_size) 1106 1107 return key_type, key, val 1108 1109 @classmethod 1110 def create_psbt_writer(cls, fd): 1111 def wr(key_type: int, val: bytes, key: bytes = b''): 1112 full_key = cls.get_fullkey_from_keytype_and_key(key_type, key) 1113 fd.write(bytes.fromhex(var_int(len(full_key)))) # key_size 1114 fd.write(full_key) # key 1115 fd.write(bytes.fromhex(var_int(len(val)))) # val_size 1116 fd.write(val) # val 1117 return wr 1118 1119 @classmethod 1120 def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]: 1121 with io.BytesIO(full_key) as key_stream: 1122 key_type = deser_compact_size(key_stream) 1123 if key_type is None: raise UnexpectedEndOfStream() 1124 key = key_stream.read() 1125 return key_type, key 1126 1127 @classmethod 1128 def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: 1129 key_type_bytes = bytes.fromhex(var_int(key_type)) 1130 return key_type_bytes + key 1131 1132 def _serialize_psbt_section(self, fd): 1133 wr = self.create_psbt_writer(fd) 1134 self.serialize_psbt_section_kvs(wr) 1135 fd.write(b'\x00') # section-separator 1136 1137 def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None: 1138 raise NotImplementedError() # implemented by subclasses 1139 1140 def serialize_psbt_section_kvs(self, wr) -> None: 1141 raise NotImplementedError() # implemented by subclasses 1142 1143 1144class PartialTxInput(TxInput, PSBTSection): 1145 def __init__(self, *args, **kwargs): 1146 TxInput.__init__(self, *args, **kwargs) 1147 self._utxo = None # type: Optional[Transaction] 1148 self._witness_utxo = None # type: Optional[TxOutput] 1149 self.part_sigs = {} # type: Dict[bytes, bytes] # pubkey -> sig 1150 self.sighash = None # type: Optional[int] 1151 self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path) 1152 self.redeem_script = None # type: Optional[bytes] 1153 self.witness_script = None # type: Optional[bytes] 1154 self._unknown = {} # type: Dict[bytes, bytes] 1155 1156 self.script_type = 'unknown' 1157 self.num_sig = 0 # type: int # num req sigs for multisig 1158 self.pubkeys = [] # type: List[bytes] # note: order matters 1159 self._trusted_value_sats = None # type: Optional[int] 1160 self._trusted_address = None # type: Optional[str] 1161 self.block_height = None # type: Optional[int] # height at which the TXO is mined; None means unknown 1162 self.spent_height = None # type: Optional[int] # height at which the TXO got spent 1163 self._is_p2sh_segwit = None # type: Optional[bool] # None means unknown 1164 self._is_native_segwit = None # type: Optional[bool] # None means unknown 1165 1166 @property 1167 def utxo(self): 1168 return self._utxo 1169 1170 @utxo.setter 1171 def utxo(self, tx: Optional[Transaction]): 1172 if tx is None: 1173 return 1174 # note that tx might be a PartialTransaction 1175 # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx 1176 tx = tx_from_any(str(tx)) 1177 # 'utxo' field in PSBT cannot be another PSBT: 1178 if not tx.is_complete(): 1179 return 1180 self._utxo = tx 1181 self.validate_data() 1182 self.ensure_there_is_only_one_utxo() 1183 1184 @property 1185 def witness_utxo(self): 1186 return self._witness_utxo 1187 1188 @witness_utxo.setter 1189 def witness_utxo(self, value: Optional[TxOutput]): 1190 self._witness_utxo = value 1191 self.validate_data() 1192 self.ensure_there_is_only_one_utxo() 1193 1194 def to_json(self): 1195 d = super().to_json() 1196 d.update({ 1197 'height': self.block_height, 1198 'value_sats': self.value_sats(), 1199 'address': self.address, 1200 'utxo': str(self.utxo) if self.utxo else None, 1201 'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None, 1202 'sighash': self.sighash, 1203 'redeem_script': self.redeem_script.hex() if self.redeem_script else None, 1204 'witness_script': self.witness_script.hex() if self.witness_script else None, 1205 'part_sigs': {pubkey.hex(): sig.hex() for pubkey, sig in self.part_sigs.items()}, 1206 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) 1207 for pubkey, (xfp, path) in self.bip32_paths.items()}, 1208 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, 1209 }) 1210 return d 1211 1212 @classmethod 1213 def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput': 1214 # FIXME: if strip_witness is True, res.is_segwit() will return False, 1215 # and res.estimated_size() will return an incorrect value. These methods 1216 # will return the correct values after we call add_input_info(). (see dscancel and bump_fee) 1217 # This is very fragile: the value returned by estimate_size() depends on the calling order. 1218 res = PartialTxInput(prevout=txin.prevout, 1219 script_sig=None if strip_witness else txin.script_sig, 1220 nsequence=txin.nsequence, 1221 witness=None if strip_witness else txin.witness, 1222 is_coinbase_output=txin.is_coinbase_output()) 1223 return res 1224 1225 def validate_data(self, *, for_signing=False) -> None: 1226 if self.utxo: 1227 if self.prevout.txid.hex() != self.utxo.txid(): 1228 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1229 f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout") 1230 if self.witness_utxo: 1231 if self.utxo.outputs()[self.prevout.out_idx] != self.witness_utxo: 1232 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1233 f"If both non-witness UTXO and witness UTXO are provided, they must be consistent") 1234 # The following test is disabled, so we are willing to sign non-segwit inputs 1235 # without verifying the input amount. This means, given a maliciously modified PSBT, 1236 # for non-segwit inputs, we might end up burning coins as miner fees. 1237 if for_signing and False: 1238 if not self.is_segwit() and self.witness_utxo: 1239 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1240 f"If a witness UTXO is provided, no non-witness signature may be created") 1241 if self.redeem_script and self.address: 1242 addr = hash160_to_p2sh(hash_160(self.redeem_script)) 1243 if self.address != addr: 1244 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1245 f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript") 1246 if self.witness_script: 1247 if self.redeem_script: 1248 if self.redeem_script != bfh(bitcoin.p2wsh_nested_script(self.witness_script.hex())): 1249 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1250 f"If a witnessScript is provided, the redeemScript must be for that witnessScript") 1251 elif self.address: 1252 if self.address != bitcoin.script_to_p2wsh(self.witness_script.hex()): 1253 raise PSBTInputConsistencyFailure(f"PSBT input validation: " 1254 f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript") 1255 1256 def parse_psbt_section_kv(self, kt, key, val): 1257 try: 1258 kt = PSBTInputType(kt) 1259 except ValueError: 1260 pass # unknown type 1261 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") 1262 if kt == PSBTInputType.NON_WITNESS_UTXO: 1263 if self.utxo is not None: 1264 raise SerializationError(f"duplicate key: {repr(kt)}") 1265 self.utxo = Transaction(val) 1266 self.utxo.deserialize() 1267 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1268 elif kt == PSBTInputType.WITNESS_UTXO: 1269 if self.witness_utxo is not None: 1270 raise SerializationError(f"duplicate key: {repr(kt)}") 1271 self.witness_utxo = TxOutput.from_network_bytes(val) 1272 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1273 elif kt == PSBTInputType.PARTIAL_SIG: 1274 if key in self.part_sigs: 1275 raise SerializationError(f"duplicate key: {repr(kt)}") 1276 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32... 1277 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}") 1278 self.part_sigs[key] = val 1279 elif kt == PSBTInputType.SIGHASH_TYPE: 1280 if self.sighash is not None: 1281 raise SerializationError(f"duplicate key: {repr(kt)}") 1282 if len(val) != 4: 1283 raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}") 1284 self.sighash = struct.unpack("<I", val)[0] 1285 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1286 elif kt == PSBTInputType.BIP32_DERIVATION: 1287 if key in self.bip32_paths: 1288 raise SerializationError(f"duplicate key: {repr(kt)}") 1289 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32... 1290 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}") 1291 self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val) 1292 elif kt == PSBTInputType.REDEEM_SCRIPT: 1293 if self.redeem_script is not None: 1294 raise SerializationError(f"duplicate key: {repr(kt)}") 1295 self.redeem_script = val 1296 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1297 elif kt == PSBTInputType.WITNESS_SCRIPT: 1298 if self.witness_script is not None: 1299 raise SerializationError(f"duplicate key: {repr(kt)}") 1300 self.witness_script = val 1301 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1302 elif kt == PSBTInputType.FINAL_SCRIPTSIG: 1303 if self.script_sig is not None: 1304 raise SerializationError(f"duplicate key: {repr(kt)}") 1305 self.script_sig = val 1306 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1307 elif kt == PSBTInputType.FINAL_SCRIPTWITNESS: 1308 if self.witness is not None: 1309 raise SerializationError(f"duplicate key: {repr(kt)}") 1310 self.witness = val 1311 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1312 else: 1313 full_key = self.get_fullkey_from_keytype_and_key(kt, key) 1314 if full_key in self._unknown: 1315 raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}') 1316 self._unknown[full_key] = val 1317 1318 def serialize_psbt_section_kvs(self, wr): 1319 self.ensure_there_is_only_one_utxo() 1320 if self.witness_utxo: 1321 wr(PSBTInputType.WITNESS_UTXO, self.witness_utxo.serialize_to_network()) 1322 if self.utxo: 1323 wr(PSBTInputType.NON_WITNESS_UTXO, bfh(self.utxo.serialize_to_network(include_sigs=True))) 1324 for pk, val in sorted(self.part_sigs.items()): 1325 wr(PSBTInputType.PARTIAL_SIG, val, pk) 1326 if self.sighash is not None: 1327 wr(PSBTInputType.SIGHASH_TYPE, struct.pack('<I', self.sighash)) 1328 if self.redeem_script is not None: 1329 wr(PSBTInputType.REDEEM_SCRIPT, self.redeem_script) 1330 if self.witness_script is not None: 1331 wr(PSBTInputType.WITNESS_SCRIPT, self.witness_script) 1332 for k in sorted(self.bip32_paths): 1333 packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k]) 1334 wr(PSBTInputType.BIP32_DERIVATION, packed_path, k) 1335 if self.script_sig is not None: 1336 wr(PSBTInputType.FINAL_SCRIPTSIG, self.script_sig) 1337 if self.witness is not None: 1338 wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness) 1339 for full_key, val in sorted(self._unknown.items()): 1340 key_type, key = self.get_keytype_and_key_from_fullkey(full_key) 1341 wr(key_type, val, key=key) 1342 1343 def value_sats(self) -> Optional[int]: 1344 if self._trusted_value_sats is not None: 1345 return self._trusted_value_sats 1346 if self.utxo: 1347 out_idx = self.prevout.out_idx 1348 return self.utxo.outputs()[out_idx].value 1349 if self.witness_utxo: 1350 return self.witness_utxo.value 1351 return None 1352 1353 @property 1354 def address(self) -> Optional[str]: 1355 if self._trusted_address is not None: 1356 return self._trusted_address 1357 scriptpubkey = self.scriptpubkey 1358 if scriptpubkey: 1359 return get_address_from_output_script(scriptpubkey) 1360 return None 1361 1362 @property 1363 def scriptpubkey(self) -> Optional[bytes]: 1364 if self._trusted_address is not None: 1365 return bfh(bitcoin.address_to_script(self._trusted_address)) 1366 if self.utxo: 1367 out_idx = self.prevout.out_idx 1368 return self.utxo.outputs()[out_idx].scriptpubkey 1369 if self.witness_utxo: 1370 return self.witness_utxo.scriptpubkey 1371 return None 1372 1373 def set_script_type(self) -> None: 1374 if self.scriptpubkey is None: 1375 return 1376 type = get_script_type_from_output_script(self.scriptpubkey) 1377 inner_type = None 1378 if type is not None: 1379 if type == 'p2sh': 1380 inner_type = get_script_type_from_output_script(self.redeem_script) 1381 elif type == 'p2wsh': 1382 inner_type = get_script_type_from_output_script(self.witness_script) 1383 if inner_type is not None: 1384 type = inner_type + '-' + type 1385 if type in ('p2pkh', 'p2wpkh-p2sh', 'p2wpkh'): 1386 self.script_type = type 1387 return 1388 1389 def is_complete(self) -> bool: 1390 if self.script_sig is not None and self.witness is not None: 1391 return True 1392 if self.is_coinbase_input(): 1393 return True 1394 if self.script_sig is not None and not self.is_segwit(): 1395 return True 1396 signatures = list(self.part_sigs.values()) 1397 s = len(signatures) 1398 # note: The 'script_type' field is currently only set by the wallet, 1399 # for its own addresses. This means we can only finalize inputs 1400 # that are related to the wallet. 1401 # The 'fix' would be adding extra logic that matches on templates, 1402 # and figures out the script_type from available fields. 1403 if self.script_type in ('p2pk', 'p2pkh', 'p2wpkh', 'p2wpkh-p2sh'): 1404 return s >= 1 1405 if self.script_type in ('p2sh', 'p2wsh', 'p2wsh-p2sh'): 1406 return s >= self.num_sig 1407 return False 1408 1409 def finalize(self) -> None: 1410 def clear_fields_when_finalized(): 1411 # BIP-174: "All other data except the UTXO and unknown fields in the 1412 # input key-value map should be cleared from the PSBT" 1413 self.part_sigs = {} 1414 self.sighash = None 1415 self.bip32_paths = {} 1416 self.redeem_script = None 1417 self.witness_script = None 1418 1419 if self.script_sig is not None and self.witness is not None: 1420 clear_fields_when_finalized() 1421 return # already finalized 1422 if self.is_complete(): 1423 self.script_sig = bfh(Transaction.input_script(self)) 1424 self.witness = bfh(Transaction.serialize_witness(self)) 1425 clear_fields_when_finalized() 1426 1427 def combine_with_other_txin(self, other_txin: 'TxInput') -> None: 1428 assert self.prevout == other_txin.prevout 1429 if other_txin.script_sig is not None: 1430 self.script_sig = other_txin.script_sig 1431 if other_txin.witness is not None: 1432 self.witness = other_txin.witness 1433 if isinstance(other_txin, PartialTxInput): 1434 if other_txin.witness_utxo: 1435 self.witness_utxo = other_txin.witness_utxo 1436 if other_txin.utxo: 1437 self.utxo = other_txin.utxo 1438 self.part_sigs.update(other_txin.part_sigs) 1439 if other_txin.sighash is not None: 1440 self.sighash = other_txin.sighash 1441 self.bip32_paths.update(other_txin.bip32_paths) 1442 if other_txin.redeem_script is not None: 1443 self.redeem_script = other_txin.redeem_script 1444 if other_txin.witness_script is not None: 1445 self.witness_script = other_txin.witness_script 1446 self._unknown.update(other_txin._unknown) 1447 self.ensure_there_is_only_one_utxo() 1448 # try to finalize now 1449 self.finalize() 1450 1451 def ensure_there_is_only_one_utxo(self): 1452 # we prefer having the full previous tx, even for segwit inputs. see #6198 1453 # for witness v1, witness_utxo will be enough though 1454 if self.utxo is not None and self.witness_utxo is not None: 1455 self.witness_utxo = None 1456 1457 def convert_utxo_to_witness_utxo(self) -> None: 1458 if self.utxo: 1459 self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx] 1460 self._utxo = None # type: Optional[Transaction] 1461 1462 def is_native_segwit(self) -> Optional[bool]: 1463 """Whether this input is native segwit. None means inconclusive.""" 1464 if self._is_native_segwit is None: 1465 if self.address: 1466 self._is_native_segwit = bitcoin.is_segwit_address(self.address) 1467 return self._is_native_segwit 1468 1469 def is_p2sh_segwit(self) -> Optional[bool]: 1470 """Whether this input is p2sh-embedded-segwit. None means inconclusive.""" 1471 if self._is_p2sh_segwit is None: 1472 def calc_if_p2sh_segwit_now(): 1473 if not (self.address and self.redeem_script): 1474 return None 1475 if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)): 1476 # not p2sh address 1477 return False 1478 try: 1479 decoded = [x for x in script_GetOp(self.redeem_script)] 1480 except MalformedBitcoinScript: 1481 decoded = None 1482 # witness version 0 1483 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0): 1484 return True 1485 # witness version 1-16 1486 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1)) 1487 for witver, opcode in enumerate(future_witness_versions, start=1): 1488 match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)] 1489 if match_script_against_template(decoded, match): 1490 return True 1491 return False 1492 1493 self._is_p2sh_segwit = calc_if_p2sh_segwit_now() 1494 return self._is_p2sh_segwit 1495 1496 def is_segwit(self, *, guess_for_address=False) -> bool: 1497 if super().is_segwit(): 1498 return True 1499 if self.is_native_segwit() or self.is_p2sh_segwit(): 1500 return True 1501 if self.is_native_segwit() is False and self.is_p2sh_segwit() is False: 1502 return False 1503 if self.witness_script: 1504 return True 1505 _type = self.script_type 1506 if _type == 'address' and guess_for_address: 1507 _type = Transaction.guess_txintype_from_address(self.address) 1508 return is_segwit_script_type(_type) 1509 1510 def already_has_some_signatures(self) -> bool: 1511 """Returns whether progress has been made towards completing this input.""" 1512 return (self.part_sigs 1513 or self.script_sig is not None 1514 or self.witness is not None) 1515 1516 1517class PartialTxOutput(TxOutput, PSBTSection): 1518 def __init__(self, *args, **kwargs): 1519 TxOutput.__init__(self, *args, **kwargs) 1520 self.redeem_script = None # type: Optional[bytes] 1521 self.witness_script = None # type: Optional[bytes] 1522 self.bip32_paths = {} # type: Dict[bytes, Tuple[bytes, Sequence[int]]] # pubkey -> (xpub_fingerprint, path) 1523 self._unknown = {} # type: Dict[bytes, bytes] 1524 1525 self.script_type = 'unknown' 1526 self.num_sig = 0 # num req sigs for multisig 1527 self.pubkeys = [] # type: List[bytes] # note: order matters 1528 self.is_mine = False # type: bool # whether the wallet considers the output to be ismine 1529 self.is_change = False # type: bool # whether the wallet considers the output to be change 1530 1531 def to_json(self): 1532 d = super().to_json() 1533 d.update({ 1534 'redeem_script': self.redeem_script.hex() if self.redeem_script else None, 1535 'witness_script': self.witness_script.hex() if self.witness_script else None, 1536 'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) 1537 for pubkey, (xfp, path) in self.bip32_paths.items()}, 1538 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, 1539 }) 1540 return d 1541 1542 @classmethod 1543 def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput': 1544 res = PartialTxOutput(scriptpubkey=txout.scriptpubkey, 1545 value=txout.value) 1546 return res 1547 1548 def parse_psbt_section_kv(self, kt, key, val): 1549 try: 1550 kt = PSBTOutputType(kt) 1551 except ValueError: 1552 pass # unknown type 1553 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") 1554 if kt == PSBTOutputType.REDEEM_SCRIPT: 1555 if self.redeem_script is not None: 1556 raise SerializationError(f"duplicate key: {repr(kt)}") 1557 self.redeem_script = val 1558 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1559 elif kt == PSBTOutputType.WITNESS_SCRIPT: 1560 if self.witness_script is not None: 1561 raise SerializationError(f"duplicate key: {repr(kt)}") 1562 self.witness_script = val 1563 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1564 elif kt == PSBTOutputType.BIP32_DERIVATION: 1565 if key in self.bip32_paths: 1566 raise SerializationError(f"duplicate key: {repr(kt)}") 1567 if len(key) not in (33, 65): # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32... 1568 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}") 1569 self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val) 1570 else: 1571 full_key = self.get_fullkey_from_keytype_and_key(kt, key) 1572 if full_key in self._unknown: 1573 raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}') 1574 self._unknown[full_key] = val 1575 1576 def serialize_psbt_section_kvs(self, wr): 1577 if self.redeem_script is not None: 1578 wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script) 1579 if self.witness_script is not None: 1580 wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script) 1581 for k in sorted(self.bip32_paths): 1582 packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k]) 1583 wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k) 1584 for full_key, val in sorted(self._unknown.items()): 1585 key_type, key = self.get_keytype_and_key_from_fullkey(full_key) 1586 wr(key_type, val, key=key) 1587 1588 def combine_with_other_txout(self, other_txout: 'TxOutput') -> None: 1589 assert self.scriptpubkey == other_txout.scriptpubkey 1590 if not isinstance(other_txout, PartialTxOutput): 1591 return 1592 if other_txout.redeem_script is not None: 1593 self.redeem_script = other_txout.redeem_script 1594 if other_txout.witness_script is not None: 1595 self.witness_script = other_txout.witness_script 1596 self.bip32_paths.update(other_txout.bip32_paths) 1597 self._unknown.update(other_txout._unknown) 1598 1599 1600class PartialTransaction(Transaction): 1601 1602 def __init__(self): 1603 Transaction.__init__(self, None) 1604 self.xpubs = {} # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]] # intermediate bip32node -> (xfp, der_prefix) 1605 self._inputs = [] # type: List[PartialTxInput] 1606 self._outputs = [] # type: List[PartialTxOutput] 1607 self._unknown = {} # type: Dict[bytes, bytes] 1608 1609 def to_json(self) -> dict: 1610 d = super().to_json() 1611 d.update({ 1612 'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path)) 1613 for bip32node, (xfp, path) in self.xpubs.items()}, 1614 'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()}, 1615 }) 1616 return d 1617 1618 @classmethod 1619 def from_tx(cls, tx: Transaction) -> 'PartialTransaction': 1620 res = cls() 1621 res._inputs = [PartialTxInput.from_txin(txin, strip_witness=True) 1622 for txin in tx.inputs()] 1623 res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()] 1624 res.version = tx.version 1625 res.locktime = tx.locktime 1626 return res 1627 1628 @classmethod 1629 def from_raw_psbt(cls, raw) -> 'PartialTransaction': 1630 # auto-detect and decode Base64 and Hex. 1631 if raw[0:10].lower() in (b'70736274ff', '70736274ff'): # hex 1632 raw = bytes.fromhex(raw) 1633 elif raw[0:6] in (b'cHNidP', 'cHNidP'): # base64 1634 raw = base64.b64decode(raw) 1635 if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff': 1636 raise BadHeaderMagic("bad magic") 1637 1638 tx = None # type: Optional[PartialTransaction] 1639 1640 # We parse the raw stream twice. The first pass is used to find the 1641 # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'. 1642 # The second pass does everything else. 1643 with io.BytesIO(raw[5:]) as fd: # parsing "first pass" 1644 while True: 1645 try: 1646 kt, key, val = PSBTSection.get_next_kv_from_fd(fd) 1647 except StopIteration: 1648 break 1649 try: 1650 kt = PSBTGlobalType(kt) 1651 except ValueError: 1652 pass # unknown type 1653 if kt == PSBTGlobalType.UNSIGNED_TX: 1654 if tx is not None: 1655 raise SerializationError(f"duplicate key: {repr(kt)}") 1656 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1657 unsigned_tx = Transaction(val.hex()) 1658 for txin in unsigned_tx.inputs(): 1659 if txin.script_sig or txin.witness: 1660 raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses") 1661 tx = PartialTransaction.from_tx(unsigned_tx) 1662 1663 if tx is None: 1664 raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX") 1665 1666 with io.BytesIO(raw[5:]) as fd: # parsing "second pass" 1667 # global section 1668 while True: 1669 try: 1670 kt, key, val = PSBTSection.get_next_kv_from_fd(fd) 1671 except StopIteration: 1672 break 1673 try: 1674 kt = PSBTGlobalType(kt) 1675 except ValueError: 1676 pass # unknown type 1677 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}") 1678 if kt == PSBTGlobalType.UNSIGNED_TX: 1679 pass # already handled during "first" parsing pass 1680 elif kt == PSBTGlobalType.XPUB: 1681 bip32node = BIP32Node.from_bytes(key) 1682 if bip32node in tx.xpubs: 1683 raise SerializationError(f"duplicate key: {repr(kt)}") 1684 xfp, path = unpack_bip32_root_fingerprint_and_int_path(val) 1685 if bip32node.depth != len(path): 1686 raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) " 1687 f"and derivation prefix len ({len(path)})") 1688 child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big') 1689 if not ((bip32node.depth == 0 and child_number_of_xpub == 0) 1690 or (bip32node.depth != 0 and child_number_of_xpub == path[-1])): 1691 raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix") 1692 tx.xpubs[bip32node] = xfp, path 1693 elif kt == PSBTGlobalType.VERSION: 1694 if len(val) > 4: 1695 raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4") 1696 psbt_version = int.from_bytes(val, byteorder='little', signed=False) 1697 if psbt_version > 0: 1698 raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}") 1699 if key: raise SerializationError(f"key for {repr(kt)} must be empty") 1700 else: 1701 full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key) 1702 if full_key in tx._unknown: 1703 raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}') 1704 tx._unknown[full_key] = val 1705 try: 1706 # inputs sections 1707 for txin in tx.inputs(): 1708 if DEBUG_PSBT_PARSING: print("-> new input starts") 1709 txin._populate_psbt_fields_from_fd(fd) 1710 # outputs sections 1711 for txout in tx.outputs(): 1712 if DEBUG_PSBT_PARSING: print("-> new output starts") 1713 txout._populate_psbt_fields_from_fd(fd) 1714 except UnexpectedEndOfStream: 1715 raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None 1716 1717 if fd.read(1) != b'': 1718 raise SerializationError("extra junk at the end of PSBT") 1719 1720 for txin in tx.inputs(): 1721 txin.validate_data() 1722 1723 return tx 1724 1725 @classmethod 1726 def from_io(cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *, 1727 locktime: int = None, version: int = None): 1728 self = cls() 1729 self._inputs = list(inputs) 1730 self._outputs = list(outputs) 1731 if locktime is not None: 1732 self.locktime = locktime 1733 if version is not None: 1734 self.version = version 1735 self.BIP69_sort() 1736 return self 1737 1738 def _serialize_psbt(self, fd) -> None: 1739 wr = PSBTSection.create_psbt_writer(fd) 1740 fd.write(b'psbt\xff') 1741 # global section 1742 wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False))) 1743 for bip32node, (xfp, path) in sorted(self.xpubs.items()): 1744 val = pack_bip32_root_fingerprint_and_int_path(xfp, path) 1745 wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes()) 1746 for full_key, val in sorted(self._unknown.items()): 1747 key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key) 1748 wr(key_type, val, key=key) 1749 fd.write(b'\x00') # section-separator 1750 # input sections 1751 for inp in self._inputs: 1752 inp._serialize_psbt_section(fd) 1753 # output sections 1754 for outp in self._outputs: 1755 outp._serialize_psbt_section(fd) 1756 1757 def finalize_psbt(self) -> None: 1758 for txin in self.inputs(): 1759 txin.finalize() 1760 1761 def combine_with_other_psbt(self, other_tx: 'Transaction') -> None: 1762 """Pulls in all data from other_tx we don't yet have (e.g. signatures). 1763 other_tx must be concerning the same unsigned tx. 1764 """ 1765 if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False): 1766 raise Exception('A Combiner must not combine two different PSBTs.') 1767 # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs. 1768 # The Combiner must remove any duplicate key-value pairs, in accordance with the specification." 1769 # global section 1770 if isinstance(other_tx, PartialTransaction): 1771 self.xpubs.update(other_tx.xpubs) 1772 self._unknown.update(other_tx._unknown) 1773 # input sections 1774 for txin, other_txin in zip(self.inputs(), other_tx.inputs()): 1775 txin.combine_with_other_txin(other_txin) 1776 # output sections 1777 for txout, other_txout in zip(self.outputs(), other_tx.outputs()): 1778 txout.combine_with_other_txout(other_txout) 1779 self.invalidate_ser_cache() 1780 1781 def join_with_other_psbt(self, other_tx: 'PartialTransaction') -> None: 1782 """Adds inputs and outputs from other_tx into this one.""" 1783 if not isinstance(other_tx, PartialTransaction): 1784 raise Exception('Can only join partial transactions.') 1785 # make sure there are no duplicate prevouts 1786 prevouts = set() 1787 for txin in itertools.chain(self.inputs(), other_tx.inputs()): 1788 prevout_str = txin.prevout.to_str() 1789 if prevout_str in prevouts: 1790 raise Exception(f"Duplicate inputs! " 1791 f"Transactions that spend the same prevout cannot be joined.") 1792 prevouts.add(prevout_str) 1793 # copy global PSBT section 1794 self.xpubs.update(other_tx.xpubs) 1795 self._unknown.update(other_tx._unknown) 1796 # copy and add inputs and outputs 1797 self.add_inputs(list(other_tx.inputs())) 1798 self.add_outputs(list(other_tx.outputs())) 1799 self.remove_signatures() 1800 self.invalidate_ser_cache() 1801 1802 def inputs(self) -> Sequence[PartialTxInput]: 1803 return self._inputs 1804 1805 def outputs(self) -> Sequence[PartialTxOutput]: 1806 return self._outputs 1807 1808 def add_inputs(self, inputs: List[PartialTxInput]) -> None: 1809 self._inputs.extend(inputs) 1810 self.BIP69_sort(outputs=False) 1811 self.invalidate_ser_cache() 1812 1813 def add_outputs(self, outputs: List[PartialTxOutput]) -> None: 1814 self._outputs.extend(outputs) 1815 self.BIP69_sort(inputs=False) 1816 self.invalidate_ser_cache() 1817 1818 def set_rbf(self, rbf: bool) -> None: 1819 nSequence = 0xffffffff - (2 if rbf else 1) 1820 for txin in self.inputs(): 1821 txin.nsequence = nSequence 1822 self.invalidate_ser_cache() 1823 1824 def BIP69_sort(self, inputs=True, outputs=True): 1825 # NOTE: other parts of the code rely on these sorts being *stable* sorts 1826 if inputs: 1827 self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx)) 1828 if outputs: 1829 self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey)) 1830 self.invalidate_ser_cache() 1831 1832 def input_value(self) -> int: 1833 input_values = [txin.value_sats() for txin in self.inputs()] 1834 if any([val is None for val in input_values]): 1835 raise MissingTxInputAmount() 1836 return sum(input_values) 1837 1838 def output_value(self) -> int: 1839 return sum(o.value for o in self.outputs()) 1840 1841 def get_fee(self) -> Optional[int]: 1842 try: 1843 return self.input_value() - self.output_value() 1844 except MissingTxInputAmount: 1845 return None 1846 1847 def serialize_preimage(self, txin_index: int, *, 1848 bip143_shared_txdigest_fields: BIP143SharedTxDigestFields = None) -> str: 1849 nVersion = int_to_hex(self.version, 4) 1850 nLocktime = int_to_hex(self.locktime, 4) 1851 inputs = self.inputs() 1852 outputs = self.outputs() 1853 txin = inputs[txin_index] 1854 sighash = txin.sighash if txin.sighash is not None else SIGHASH_ALL 1855 if sighash != SIGHASH_ALL: 1856 raise Exception("only SIGHASH_ALL signing is supported!") 1857 nHashType = int_to_hex(sighash, 4) 1858 preimage_script = self.get_preimage_script(txin) 1859 if txin.is_segwit(): 1860 if bip143_shared_txdigest_fields is None: 1861 bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields() 1862 hashPrevouts = bip143_shared_txdigest_fields.hashPrevouts 1863 hashSequence = bip143_shared_txdigest_fields.hashSequence 1864 hashOutputs = bip143_shared_txdigest_fields.hashOutputs 1865 outpoint = txin.prevout.serialize_to_network().hex() 1866 scriptCode = var_int(len(preimage_script) // 2) + preimage_script 1867 amount = int_to_hex(txin.value_sats(), 8) 1868 nSequence = int_to_hex(txin.nsequence, 4) 1869 preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType 1870 else: 1871 txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, preimage_script if txin_index==k else '') 1872 for k, txin in enumerate(inputs)) 1873 txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs) 1874 preimage = nVersion + txins + txouts + nLocktime + nHashType 1875 return preimage 1876 1877 def sign(self, keypairs) -> None: 1878 # keypairs: pubkey_hex -> (secret_bytes, is_compressed) 1879 bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields() 1880 for i, txin in enumerate(self.inputs()): 1881 pubkeys = [pk.hex() for pk in txin.pubkeys] 1882 for pubkey in pubkeys: 1883 if txin.is_complete(): 1884 break 1885 if pubkey not in keypairs: 1886 continue 1887 _logger.info(f"adding signature for {pubkey}") 1888 sec, compressed = keypairs[pubkey] 1889 sig = self.sign_txin(i, sec, bip143_shared_txdigest_fields=bip143_shared_txdigest_fields) 1890 self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig) 1891 1892 _logger.debug(f"is_complete {self.is_complete()}") 1893 self.invalidate_ser_cache() 1894 1895 def sign_txin(self, txin_index, privkey_bytes, *, bip143_shared_txdigest_fields=None) -> str: 1896 txin = self.inputs()[txin_index] 1897 txin.validate_data(for_signing=True) 1898 pre_hash = sha256d(bfh(self.serialize_preimage(txin_index, 1899 bip143_shared_txdigest_fields=bip143_shared_txdigest_fields))) 1900 privkey = ecc.ECPrivkey(privkey_bytes) 1901 sig = privkey.sign_transaction(pre_hash) 1902 sig = bh2u(sig) + '01' # SIGHASH_ALL 1903 return sig 1904 1905 def is_complete(self) -> bool: 1906 return all([txin.is_complete() for txin in self.inputs()]) 1907 1908 def signature_count(self) -> Tuple[int, int]: 1909 s = 0 # "num Sigs we have" 1910 r = 0 # "Required" 1911 for txin in self.inputs(): 1912 if txin.is_coinbase_input(): 1913 continue 1914 signatures = list(txin.part_sigs.values()) 1915 s += len(signatures) 1916 r += txin.num_sig 1917 return s, r 1918 1919 def serialize(self) -> str: 1920 """Returns PSBT as base64 text, or raw hex of network tx (if complete).""" 1921 self.finalize_psbt() 1922 if self.is_complete(): 1923 return Transaction.serialize(self) 1924 return self._serialize_as_base64() 1925 1926 def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes: 1927 """Returns PSBT as raw bytes, or raw bytes of network tx (if complete).""" 1928 self.finalize_psbt() 1929 if force_psbt or not self.is_complete(): 1930 with io.BytesIO() as fd: 1931 self._serialize_psbt(fd) 1932 return fd.getvalue() 1933 else: 1934 return Transaction.serialize_as_bytes(self) 1935 1936 def _serialize_as_base64(self) -> str: 1937 raw_bytes = self.serialize_as_bytes() 1938 return base64.b64encode(raw_bytes).decode('ascii') 1939 1940 def update_signatures(self, signatures: Sequence[str]): 1941 """Add new signatures to a transaction 1942 1943 `signatures` is expected to be a list of sigs with signatures[i] 1944 intended for self._inputs[i]. 1945 This is used by the Trezor, KeepKey and Safe-T plugins. 1946 """ 1947 if self.is_complete(): 1948 return 1949 if len(self.inputs()) != len(signatures): 1950 raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures))) 1951 for i, txin in enumerate(self.inputs()): 1952 pubkeys = [pk.hex() for pk in txin.pubkeys] 1953 sig = signatures[i] 1954 if bfh(sig) in list(txin.part_sigs.values()): 1955 continue 1956 pre_hash = sha256d(bfh(self.serialize_preimage(i))) 1957 sig_string = ecc.sig_string_from_der_sig(bfh(sig[:-2])) 1958 for recid in range(4): 1959 try: 1960 public_key = ecc.ECPubkey.from_sig_string(sig_string, recid, pre_hash) 1961 except ecc.InvalidECPointException: 1962 # the point might not be on the curve for some recid values 1963 continue 1964 pubkey_hex = public_key.get_public_key_hex(compressed=True) 1965 if pubkey_hex in pubkeys: 1966 try: 1967 public_key.verify_message_hash(sig_string, pre_hash) 1968 except Exception: 1969 _logger.exception('') 1970 continue 1971 _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_hex}, sig={sig}") 1972 self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_hex, sig=sig) 1973 break 1974 # redo raw 1975 self.invalidate_ser_cache() 1976 1977 def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: str, sig: str): 1978 txin = self._inputs[txin_idx] 1979 txin.part_sigs[bfh(signing_pubkey)] = bfh(sig) 1980 # force re-serialization 1981 txin.script_sig = None 1982 txin.witness = None 1983 self.invalidate_ser_cache() 1984 1985 def add_info_from_wallet( 1986 self, 1987 wallet: 'Abstract_Wallet', 1988 *, 1989 include_xpubs: bool = False, 1990 ignore_network_issues: bool = True, 1991 ) -> None: 1992 if self.is_complete(): 1993 return 1994 # only include xpubs for multisig wallets; currently only they need it in practice 1995 # note: coldcard fw have a limitation that if they are included then all 1996 # inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761 1997 # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs 1998 from .wallet import Multisig_Wallet 1999 if include_xpubs and isinstance(wallet, Multisig_Wallet): 2000 from .keystore import Xpub 2001 for ks in wallet.get_keystores(): 2002 if isinstance(ks, Xpub): 2003 fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx( 2004 der_suffix=[], only_der_suffix=False) 2005 xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False) 2006 bip32node = BIP32Node.from_xkey(xpub) 2007 self.xpubs[bip32node] = (fp_bytes, der_full) 2008 for txin in self.inputs(): 2009 wallet.add_input_info( 2010 txin, 2011 only_der_suffix=False, 2012 ignore_network_issues=ignore_network_issues, 2013 ) 2014 for txout in self.outputs(): 2015 wallet.add_output_info( 2016 txout, 2017 only_der_suffix=False, 2018 ) 2019 2020 def remove_xpubs_and_bip32_paths(self) -> None: 2021 self.xpubs.clear() 2022 for txin in self.inputs(): 2023 txin.bip32_paths.clear() 2024 for txout in self.outputs(): 2025 txout.bip32_paths.clear() 2026 2027 def prepare_for_export_for_coinjoin(self) -> None: 2028 """Removes all sensitive details.""" 2029 # globals 2030 self.xpubs.clear() 2031 self._unknown.clear() 2032 # inputs 2033 for txin in self.inputs(): 2034 txin.bip32_paths.clear() 2035 # outputs 2036 for txout in self.outputs(): 2037 txout.redeem_script = None 2038 txout.witness_script = None 2039 txout.bip32_paths.clear() 2040 txout._unknown.clear() 2041 2042 def convert_all_utxos_to_witness_utxos(self) -> None: 2043 """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs. 2044 This will likely make an exported PSBT invalid spec-wise, 2045 but it makes e.g. QR codes significantly smaller. 2046 """ 2047 for txin in self.inputs(): 2048 txin.convert_utxo_to_witness_utxo() 2049 2050 def remove_signatures(self): 2051 for txin in self.inputs(): 2052 txin.part_sigs = {} 2053 txin.script_sig = None 2054 txin.witness = None 2055 assert not self.is_complete() 2056 self.invalidate_ser_cache() 2057 2058 def update_txin_script_type(self): 2059 """Determine the script_type of each input by analyzing the scripts. 2060 It updates all tx-Inputs, NOT only the wallet owned, if the 2061 scriptpubkey is present. 2062 """ 2063 for txin in self.inputs(): 2064 if txin.script_type in ('unknown', 'address'): 2065 txin.set_script_type() 2066 2067def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes: 2068 if len(xfp) != 4: 2069 raise Exception(f'unexpected xfp length. xfp={xfp}') 2070 return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path) 2071 2072 2073def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]: 2074 if len(path) % 4 != 0: 2075 raise Exception(f'unexpected packed path length. path={path.hex()}') 2076 xfp = path[0:4] 2077 int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)] 2078 return xfp, int_path 2079