1from ..encoding.sec import is_sec, public_pair_to_hash160_sec, sec_to_public_pair, EncodingError 2 3from pycoin.coins.SolutionChecker import ScriptError 4from pycoin.satoshi.checksigops import parse_signature_blob 5from pycoin.satoshi.der import UnexpectedDER 6 7 8class WhoSigned(object): 9 def __init__(self, script_tools, address_api, generator): 10 self._script_tools = script_tools 11 self._address = address_api 12 self._generator = generator 13 14 def solution_blobs(self, tx, tx_in_idx): 15 """ 16 This iterator yields data blobs that appear in the the last solution_script or the witness. 17 """ 18 sc = tx.SolutionChecker(tx) 19 tx_context = sc.tx_context_for_idx(tx_in_idx) 20 # set solution_stack in case there are no results from puzzle_and_solution_iterator 21 solution_stack = [] 22 for puzzle_script, solution_stack, flags, sighash_f in sc.puzzle_and_solution_iterator(tx_context): 23 pass 24 # we only care about the last one 25 for s in solution_stack: 26 yield s 27 28 def extract_signatures(self, tx, tx_in_idx): 29 sc = tx.SolutionChecker(tx) 30 tx_context = sc.tx_context_for_idx(tx_in_idx) 31 # set solution_stack in case there are no results from puzzle_and_solution_iterator 32 solution_stack = [] 33 for puzzle_script, solution_stack, flags, sighash_f in sc.puzzle_and_solution_iterator(tx_context): 34 pass 35 # we only care about the last one 36 37 vm = sc.VM(puzzle_script, tx_context, sighash_f, flags=flags, initial_stack=solution_stack[:]) 38 for data in self.solution_blobs(tx, tx_in_idx): 39 try: 40 sig_pair, sig_type = parse_signature_blob(data) 41 sig_hash = sighash_f(sig_type, sig_blobs=[], vm=vm) 42 yield (data, sig_hash) 43 except (ValueError, TypeError, UnexpectedDER, ScriptError): 44 continue 45 46 def extract_secs(self, tx, tx_in_idx): 47 """ 48 For a given script solution, iterate yield its sec blobs 49 """ 50 sc = tx.SolutionChecker(tx) 51 tx_context = sc.tx_context_for_idx(tx_in_idx) 52 # set solution_stack in case there are no results from puzzle_and_solution_iterator 53 solution_stack = [] 54 for puzzle_script, solution_stack, flags, sighash_f in sc.puzzle_and_solution_iterator(tx_context): 55 for opcode, data, pc, new_pc in self._script_tools.get_opcodes(puzzle_script): 56 if data and is_sec(data): 57 yield data 58 for data in solution_stack: 59 if is_sec(data): 60 yield data 61 62 def public_pairs_for_script(self, tx, tx_in_idx, generator): 63 """ 64 For a given script, iterate over and pull out public pairs encoded as sec values. 65 """ 66 public_pairs = [] 67 for sec in self.extract_secs(tx, tx_in_idx): 68 try: 69 public_pairs.append(sec_to_public_pair(sec, generator)) 70 except EncodingError: 71 pass 72 return public_pairs 73 74 def public_pairs_signed(self, tx, tx_in_idx): 75 signed_by = [] 76 77 public_pairs = self.public_pairs_for_script(tx, tx_in_idx, self._generator) 78 79 for signature, sig_hash in self.extract_signatures(tx, tx_in_idx): 80 sig_pair, sig_type = parse_signature_blob(signature) 81 82 for public_pair in public_pairs: 83 if self._generator.verify(public_pair, sig_hash, sig_pair): 84 signed_by.append((public_pair, sig_pair, sig_type)) 85 return signed_by 86 87 def who_signed_tx(self, tx, tx_in_idx): 88 """ 89 Given a transaction (tx) an input index (tx_in_idx), attempt to figure 90 out which addresses where used in signing (so far). This method 91 depends on tx.unspents being properly configured. This should work on 92 partially-signed MULTISIG transactions (it will return as many 93 addresses as there are good signatures). 94 Returns a list of (public_pairs, sig_type) pairs. 95 """ 96 public_pair_sig_type_list = self.public_pairs_signed(tx, tx_in_idx) 97 sig_type_list = [pp[-1] for pp in public_pair_sig_type_list] 98 hash160_list = [public_pair_to_hash160_sec(pp[0]) for pp in public_pair_sig_type_list] 99 address_list = [self._address.for_p2pkh(h160) for h160 in hash160_list] 100 return list(zip(address_list, sig_type_list)) 101