1 2# Python module to read erlang ext (term) format 3# 4# written by Attila Tajti on 2003, for 5# 6# TODO: reads compressed data only 7 8import os, sys, struct, zlib, cStringIO 9 10class erlang_atom: 11 12 def __init__(self, atom): 13 self.atom = atom 14 15 def __str__(self): 16 return self.atom 17 18 def __eq__(self, other): 19 return self.atom == other.atom 20 21 def __ne__(self, other): 22 return self.atom != other.atom 23 24 def __repr__(self): 25 return "atom <%s>" % self.atom 26 27class erlang_ext_reader: 28 29 def __init__(self, filename): 30 file = open(filename, "rb") 31 header = file.read(15) 32 fsize, = struct.unpack(">L", file.read(4)) # file_size - 19 33 misc, = struct.unpack(">H", file.read(2)) 34 dsize, = struct.unpack(">L", file.read(4)) # uncompressed data size 35 data = file.read(fsize-6) 36 file.close() 37 38 data = zlib.decompress(data) 39 if dsize != len(data): print "ERROR: uncompressed size does not match." 40 self.data = cStringIO.StringIO(data) 41 42 self.logstr = "" 43 self.depth = 0 44 self.datalog = "" 45 self.logging = 0 46 47 def log_str(self, str): 48 pass 49 #self.logstr += str + "\n" 50 51 def log_data(self, str): 52 self.datalog += " " * self.depth + str + "\n" 53 54 def log_begin_block(self): 55 self.datalog += " " * self.depth + "{\n" 56 self.depth += 1 57 58 def log_end_block(self): 59 self.depth -= 1 60 if self.depth < 0: raise "hell" 61 self.datalog += " " * self.depth + "}\n" 62 63 def read_small_int(self): 64 val, = struct.unpack(">B", self.data.read(1)) 65 if self.logging: 66 self.log_str("small_int: %d" % (val)) 67 self.log_data(str(val)) 68 return val 69 70 def read_int(self): 71 val, = struct.unpack(">l", self.data.read(4)) 72 if self.logging: 73 self.log_str("int: %d\n" % (val)) 74 self.log_data(str(val)) 75 return val 76 77 def read_float(self): 78 buf = self.data.read(31) 79 chrs = filter(lambda char: ord(char) > 0, buf) 80 val = float(chrs) 81 if self.logging: 82 self.log_str("float: %f\n" % (val)) 83 self.log_data(str(val)) 84 return val 85 86 def read_atom(self): 87 namelen, = struct.unpack(">H", self.data.read(2)) 88 name = self.data.read(namelen) 89 if self.logging: 90 self.log_str("atom: %d %s" % (namelen, name)) 91 self.log_data("ATOM %s" % name) 92 return erlang_atom(name) 93 94 def read_tuple(self, len): 95 if self.logging: 96 self.log_data("TUPLE [%d]" % len) 97 self.log_begin_block() 98 val = [] 99 for i in range(len): 100 val.append(self.read_element()) 101 if self.logging: 102 self.log_end_block() 103 return tuple(val) 104 105 def read_small_tuple(self): 106 len, = struct.unpack(">B", self.data.read(1)) 107 if self.logging: 108 self.log_str("small_tuple: %d" % (len)) 109 return self.read_tuple(len) 110 111 def read_large_tuple(self): 112 len, = struct.unpack(">L", self.data.read(4)) 113 if self.logging: 114 self.log_str("large_tuple: %d" % (len)) 115 return self.read_tuple(len) 116 117 def read_listx(self): 118 len, = struct.unpack(">L", self.data.read(4)) 119 if self.logging: 120 self.log_str("list: %d" % len) 121 self.log_data("LIST [%d]" % len) 122 self.log_begin_block() 123 val = [] 124 elem = 1 125 while elem != None: 126 elem = self.read_element() 127 val.append(elem) 128 if self.logging: 129 self.log_end_block() 130 return val 131 132 def read_list(self): 133 len, = struct.unpack(">L", self.data.read(4)) 134 if self.logging: 135 self.log_str("list: %d" % len) 136 self.log_data("LIST [%d]" % len) 137 self.log_begin_block() 138 val = [] 139 for i in range(len): 140 #if self.depth == 5: self.log_str(str(i)) 141 elem = self.read_element() 142 val.append(elem) 143 elem = self.read_element() 144 if elem != None: raise "hey!" 145 if self.logging: 146 self.log_end_block() 147 return val 148 149 def read_string(self): 150 namelen, = struct.unpack(">H", self.data.read(2)) 151 name = self.data.read(namelen) 152 if self.logging: 153 self.log_str("string: %d %s" % (namelen, name)) 154 self.log_data('STRING %s' % repr(name)) 155 return name 156 157 def read_binary(self): 158 len, = struct.unpack(">L", self.data.read(4)) 159 data = self.data.read(len) 160 if self.logging: 161 def hexchar(x): 162 return hex(ord(x))[2:] 163 repr = "".join(map(hexchar, data)) 164 self.log_str("binary: %d %s" % (len, repr)) 165 self.log_data('BINARY [%d] 0x%s' % (len, repr)) 166 return data 167 168 def read_nil(self): 169 if self.logging: 170 self.log_data('NIL') 171 return None 172 173 def read_element(self): 174 id, = struct.unpack(">B", self.data.read(1)) 175 176 return self.read_element_using_id(id) 177 178 def read_element_using_id(self, id): 179 #if self.depth == 5: self.log_str("read element %d" % id) 180 181 if id == 97: 182 return self.read_small_int() 183 184 elif id == 98: 185 return self.read_int() 186 187 elif id == 99: 188 return self.read_float() 189 190 elif id == 100: 191 return self.read_atom() 192 193 elif id == 104: 194 return self.read_small_tuple() 195 196 elif id == 105: 197 return self.read_large_tuple() 198 199 elif id == 106: 200 return self.read_nil() 201 202 elif id == 107: 203 return self.read_string() 204 205 elif id == 108: 206 return self.read_list() 207 208 elif id == 109: 209 return self.read_binary() 210 211 else: 212 raise "problem " + str(id) 213 214 def read(self): 215 return self.read_element() 216 217 def readtest(self): 218 self.read_element() 219 220 #run = 1 221 #while run: 222 #run = self.read_element() 223 224def test(): 225 e = erlang_ext_reader("tank1w.wings") 226 try: 227 data = e.read_element() 228 finally: 229 f = open("log.txt", "w") 230 f.write(e.datalog) 231 f.write(e.logstr) 232 f.write(repr(data)) 233 234#test() 235