1import collections 2import itertools 3 4from pycoin.encoding.hash import hash160 5from pycoin.encoding.hexbytes import b2h 6from pycoin.encoding.sec import is_sec_compressed, public_pair_to_hash160_sec 7from pycoin.intbytes import byte2int 8from pycoin.satoshi.flags import SIGHASH_ALL, SIGHASH_NONE, SIGHASH_SINGLE, SIGHASH_ANYONECANPAY, SIGHASH_FORKID 9from pycoin.satoshi.checksigops import parse_signature_blob 10from pycoin.coins.SolutionChecker import ScriptError 11 12 13class Annotate(object): 14 BIT_LIST = [(SIGHASH_ANYONECANPAY, "SIGHASH_ANYONECANPAY"), (SIGHASH_FORKID, "SIGHASH_FORKID")] 15 BASE_LOOKUP = {SIGHASH_ALL: "SIGHASH_ALL", SIGHASH_SINGLE: "SIGHASH_SINGLE", SIGHASH_NONE: "SIGHASH_NONE"} 16 17 def __init__(self, script_tools, address_api): 18 self._script_tools = script_tools 19 self._address = address_api 20 for _ in "EQUAL HASH160 CHECKSIG CHECKSIGVERIFY CHECKMULTISIG CHECKMULTISIGVERIFY".split(): 21 setattr(self, "OP_%s" % _, byte2int(self._script_tools.compile('OP_%s' % _))) 22 23 def sighash_type_to_string(self, sighash_type): 24 v = sighash_type 25 flag_bit_list = [] 26 for flag_bit, flag_name in self.BIT_LIST: 27 if v & flag_bit: 28 v &= ~flag_bit 29 flag_bit_list.append(flag_name) 30 base_type = self.BASE_LOOKUP.get(v, "SIGHASH_UNKNOWN") 31 return "".join([base_type] + [" | %s" % s for s in flag_bit_list]) 32 33 def instruction_for_opcode(self, opcode, data): 34 if data is None or len(data) == 0: 35 return self._script_tools.disassemble_for_opcode_data(opcode, data) 36 return "[PUSH_%d] %s" % (opcode, b2h(data)) 37 38 def annotate_pubkey(self, blob, da): 39 is_compressed = is_sec_compressed(blob) 40 address = self._address.for_p2pkh(hash160(blob)) 41 da[blob].append("SEC for %scompressed %s" % ("" if is_compressed else "un", address)) 42 43 def annotate_signature(self, blob, da, vmc): 44 lst = da[blob] 45 try: 46 sig_pair, sig_type = parse_signature_blob(blob) 47 except ValueError: 48 return 49 lst.append("r: {0:#066x}".format(sig_pair[0])) 50 lst.append("s: {0:#066x}".format(sig_pair[1])) 51 sig_hash = vmc.signature_for_hash_type_f(sig_type, [blob], vmc) 52 lst.append("z: {0:#066x}".format(sig_hash)) 53 lst.append("signature type %s" % self.sighash_type_to_string(sig_type)) 54 addresses = [] 55 generator = vmc.generator_for_signature_type(sig_type) 56 pairs = generator.possible_public_pairs_for_signature(sig_hash, sig_pair) 57 for pair in pairs: 58 for comp in (True, False): 59 hash160 = public_pair_to_hash160_sec(pair, compressed=comp) 60 address = self._address.for_p2pkh(hash160) 61 addresses.append(address) 62 lst.append(" sig for %s" % " ".join(addresses)) 63 64 def annotate_checksig(self, vmc, da): 65 s = list(vmc.stack) 66 try: 67 self.annotate_pubkey(vmc.pop(), da) 68 self.annotate_signature(vmc.pop(), da, vmc) 69 except (IndexError, ValueError): 70 pass 71 vmc.stack = s 72 73 def annotate_checkmultisig(self, vmc, da): 74 s = list(vmc.stack) 75 try: 76 key_count = vmc.pop_int() 77 while key_count > 0: 78 key_count -= 1 79 self.annotate_pubkey(vmc.pop(), da) 80 81 signature_count = vmc.pop_int() 82 while signature_count > 0: 83 signature_count -= 1 84 self.annotate_signature(vmc.pop(), da, vmc) 85 except IndexError: 86 pass 87 vmc.stack = s 88 89 def annotate_scripts(self, tx, tx_in_idx): 90 "return list of pre_annotations, pc, opcode, instruction, post_annotations" 91 # input_annotations_f, output_annotations_f = annotation_f_for_scripts(tx, tx_in_idx) 92 93 data_annotations = collections.defaultdict(list) 94 95 def traceback_f(opcode, data, pc, vmc): 96 if opcode in (self.OP_CHECKSIG, self.OP_CHECKSIGVERIFY): 97 self.annotate_checksig(vmc, data_annotations) 98 if opcode in (self.OP_CHECKMULTISIG, self.OP_CHECKMULTISIGVERIFY): 99 self.annotate_checkmultisig(vmc, data_annotations) 100 return 101 102 try: 103 tx.check_solution(tx_in_idx, traceback_f=traceback_f) 104 except ScriptError: 105 pass 106 107 r = [] 108 109 def traceback_f(opcode, data, pc, vmc): 110 a0 = [] 111 if vmc.pc == 0: 112 if vmc.is_solution_script: 113 a0.append("--- SIGNATURE SCRIPT START") 114 else: 115 a0.append("--- PUBLIC KEY SCRIPT START") 116 r.append((a0, vmc.pc, opcode, self.instruction_for_opcode(opcode, data), data_annotations[data])) 117 118 try: 119 tx.check_solution(tx_in_idx, traceback_f=traceback_f) 120 except ScriptError: 121 pass 122 123 # the script may have ended early, so let's just double-check 124 try: 125 for idx, (opcode, data, pc, new_pc) in enumerate(itertools.chain( 126 self._script_tools.get_opcodes(tx.unspents[tx_in_idx].script), 127 self._script_tools.get_opcodes(tx.txs_in[tx_in_idx].script))): 128 if idx >= len(r): 129 r.append(([], pc, opcode, self.instruction_for_opcode(opcode, data), [])) 130 except IndexError: 131 pass 132 133 return r 134 135 def annotate_spendable(self, tx_class, spendable): 136 txs_in = [tx_class.TxIn(b'1' * 32, 0)] 137 fake_spend_tx = tx_class(1, txs_in, []) 138 fake_spend_tx.set_unspents([spendable]) 139 return self.annotate_scripts(fake_spend_tx, 0) 140