1import io 2import struct 3 4from pycoin.encoding.hash import double_sha256 5from pycoin.encoding.hexbytes import b2h_rev 6from pycoin.satoshi.satoshi_int import parse_satoshi_int 7from pycoin.satoshi.satoshi_streamer import Streamer, STREAMER_FUNCTIONS 8 9from .InvItem import InvItem 10from .PeerAddress import PeerAddress 11 12# definitions of message structures and types 13# L: 4 byte integer 14# 6: 6 byte integer 15# Q: 8 byte long integer 16# S: unicode string encoded using utf-8 17# [v]: array of InvItem objects 18# [LA]: array of (L, PeerAddress) tuples 19# b: boolean 20# A: PeerAddress object 21# B: Block object 22# T: Tx object 23# O: optional boolean 24 25 26STANDARD_P2P_MESSAGES = { 27 'version': ( 28 "version:L services:Q timestamp:Q remote_address:A local_address:A" 29 " nonce:Q subversion:S last_block_index:L relay:O" 30 ), 31 'verack': "", 32 'addr': "date_address_tuples:[LA]", 33 'inv': "items:[v]", 34 'getdata': "items:[v]", 35 'notfound': "items:[v]", 36 'reject': "message:S code:1 reason:S data:#", 37 'getblocks': "version:L hashes:[#] hash_stop:#", 38 'getheaders': "version:L hashes:[#] hash_stop:#", 39 'sendheaders': "", 40 'tx': "tx:T", 41 'block': "block:B", 42 'headers': "headers:[zI]", 43 'getaddr': "", 44 'mempool': "", 45 'feefilter': "fee_filter_value:Q", 46 'sendcmpct': "enabled:b version:Q", 47 'cmpctblock': "header_hash:# nonce:Q short_ids:[6] prefilled_txs:[IT]", 48 'getblocktxn': "header_hash:# indices:[I]", 49 'blocktxn': "header_hash:# txs:[T]", 50 # 'checkorder': obsolete 51 # 'submitorder': obsolete 52 # 'reply': obsolete 53 'ping': "nonce:Q", 54 'pong': "nonce:Q", 55 'filterload': "filter:[1] hash_function_count:L tweak:L flags:b", 56 'filteradd': "data:[1]", 57 'filterclear': "", 58 'merkleblock': ( 59 "header:z total_transactions:L hashes:[#] flags:[1]" 60 ), 61 'alert': "payload:S signature:S", 62} 63 64 65def standard_messages(): 66 return dict(STANDARD_P2P_MESSAGES) 67 68 69def _recurse(level_widths, level_index, node_index, hashes, flags, flag_index, tx_acc): 70 idx, r = divmod(flag_index, 8) 71 mask = (1 << r) 72 flag_index += 1 73 if flags[idx] & mask == 0: 74 h = hashes.pop() 75 return h, flag_index 76 77 if level_index == len(level_widths) - 1: 78 h = hashes.pop() 79 tx_acc.append(h) 80 return h, flag_index 81 82 # traverse the left 83 left_hash, flag_index = _recurse( 84 level_widths, level_index+1, node_index*2, hashes, flags, flag_index, tx_acc) 85 86 # is there a right? 87 if node_index*2+1 < level_widths[level_index+1]: 88 right_hash, flag_index = _recurse( 89 level_widths, level_index+1, node_index*2+1, hashes, flags, flag_index, tx_acc) 90 91 if left_hash == right_hash: 92 raise ValueError("merkle hash has same left and right value at node %d" % node_index) 93 else: 94 right_hash = left_hash 95 96 return double_sha256(left_hash + right_hash), flag_index 97 98 99def post_unpack_merkleblock(d, f): 100 """ 101 A post-processing "post_unpack" to merkleblock messages. 102 103 It validates the merkle proofs (throwing an exception if there's 104 an error), and returns the list of transaction hashes in "tx_hashes". 105 106 The transactions are supposed to be sent immediately after the merkleblock message. 107 """ 108 level_widths = [] 109 count = d["total_transactions"] 110 while count > 1: 111 level_widths.append(count) 112 count += 1 113 count //= 2 114 level_widths.append(1) 115 level_widths.reverse() 116 117 tx_acc = [] 118 flags = d["flags"] 119 hashes = list(reversed(d["hashes"])) 120 left_hash, flag_index = _recurse(level_widths, 0, 0, hashes, flags, 0, tx_acc) 121 122 if len(hashes) > 0: 123 raise ValueError("extra hashes: %s" % hashes) 124 125 idx, r = divmod(flag_index-1, 8) 126 if idx != len(flags) - 1: 127 raise ValueError("not enough flags consumed") 128 129 if flags[idx] > (1 << (r+1))-1: 130 raise ValueError("unconsumed 1 flag bits set") 131 132 if left_hash != d["header"].merkle_root: 133 raise ValueError( 134 "merkle root %s does not match calculated hash %s" % ( 135 b2h_rev(d["header"].merkle_root), b2h_rev(left_hash))) 136 137 d["tx_hashes"] = tx_acc 138 return d 139 140 141def _make_parser(streamer, the_struct): 142 "Return a function that parses the given structure into a dict" 143 struct_items = [s.split(":") for s in the_struct.split()] 144 names = [s[0] for s in struct_items] 145 types = ''.join(s[1] for s in struct_items) 146 147 def f(message_stream): 148 return streamer.parse_as_dict(names, types, message_stream) 149 return f 150 151 152def make_post_unpack_alert(streamer): 153 """ 154 Post-processor to "alert" message, to add an "alert_info" dictionary of parsed 155 alert information. 156 """ 157 the_struct = ("version:L relayUntil:Q expiration:Q id:L cancel:L setCancel:[L] minVer:L " 158 "maxVer:L setSubVer:[S] priority:L comment:S statusBar:S reserved:S") 159 160 alert_submessage_parser = _make_parser(streamer, the_struct) 161 162 def post_unpack_alert(d, f): 163 d1 = alert_submessage_parser(io.BytesIO(d["payload"])) 164 d["alert_info"] = d1 165 return d 166 return post_unpack_alert 167 168 169def standard_parsing_functions(Block, Tx): 170 """ 171 Return the standard parsing functions for a given Block and Tx class. 172 The return value is expected to be used with the standard_streamer function. 173 """ 174 def stream_block(f, block): 175 assert isinstance(block, Block) 176 block.stream(f) 177 178 def stream_blockheader(f, blockheader): 179 assert isinstance(blockheader, Block) 180 blockheader.stream_header(f) 181 182 def stream_tx(f, tx): 183 assert isinstance(tx, Tx) 184 tx.stream(f) 185 186 def parse_int_6(f): 187 b = f.read(6) + b'\0\0' 188 return struct.unpack(b, "<L")[0] 189 190 def stream_int_6(f, v): 191 f.write(struct.pack(v, "<L")[:6]) 192 193 more_parsing = [ 194 ("A", (PeerAddress.parse, lambda f, peer_addr: peer_addr.stream(f))), 195 ("v", (InvItem.parse, lambda f, inv_item: inv_item.stream(f))), 196 ("T", (Tx.parse, stream_tx)), 197 ("B", (Block.parse, stream_block)), 198 ("z", (Block.parse_as_header, stream_blockheader)), 199 ("1", (lambda f: struct.unpack("B", f.read(1))[0], lambda f, v: f.write(struct.pack("B", v)))), 200 ("6", (parse_int_6, stream_int_6)), 201 ("O", (lambda f: True if f.read(1) else False, 202 lambda f, v: f.write(b'' if v is None else struct.pack("B", v)))), 203 ] 204 all_items = list(STREAMER_FUNCTIONS.items()) 205 all_items.extend(more_parsing) 206 return all_items 207 208 209def standard_streamer(parsing_functions, parse_satoshi_int=parse_satoshi_int): 210 """ 211 Create a satoshi_streamer, which parses and packs using the bitcoin protocol 212 (mostly the custom way arrays and integers are parsed and packed). 213 """ 214 streamer = Streamer() 215 streamer.register_array_count_parse(parse_satoshi_int) 216 streamer.register_functions(parsing_functions) 217 return streamer 218 219 220def standard_message_post_unpacks(streamer): 221 """ 222 The standard message post-processors: one for the version message, 223 one for the alert message, and one for the merkleblock message. 224 """ 225 return dict(alert=make_post_unpack_alert(streamer), merkleblock=post_unpack_merkleblock) 226 227 228def make_parser_and_packer(streamer, message_dict, message_post_unpacks): 229 """ 230 Create a parser and a packer for a peer's network messages. 231 232 streamer: 233 used in conjunction with the message_dict. The message_dict turns a message into 234 a string specifying the fields, and this dictionary specifies how to pack or unpack 235 fields to or from bytes 236 message_dict: 237 a dictionary specifying how to pack or unpack the various messages like "version" 238 message_post_unpacks: 239 a dictionary specifying functions to call to postprocess message to, for example 240 extract submessages, like in "alert" 241 """ 242 message_parsers = dict((k, _make_parser(streamer, v)) for k, v in message_dict.items()) 243 244 def parse_from_data(message_name, data): 245 message_stream = io.BytesIO(data) 246 parser = message_parsers.get(message_name) 247 if parser is None: 248 raise KeyError("unknown message: %s" % message_name) 249 d = parser(message_stream) 250 post_unpack = message_post_unpacks.get(message_name) 251 if post_unpack: 252 d = post_unpack(d, message_stream) 253 return d 254 255 def pack_from_data(message_name, **kwargs): 256 the_struct = message_dict[message_name] 257 if not the_struct: 258 return b'' 259 f = io.BytesIO() 260 the_fields = the_struct.split(" ") 261 pairs = [t.split(":") for t in the_fields] 262 for name, type in pairs: 263 if type[0] == '[': 264 streamer.stream_struct("I", f, len(kwargs[name])) 265 for v in kwargs[name]: 266 if not isinstance(v, (tuple, list)): 267 v = [v] 268 streamer.stream_struct(type[1:-1], f, *v) 269 else: 270 streamer.stream_struct(type, f, kwargs[name]) 271 return f.getvalue() 272 273 return parse_from_data, pack_from_data 274