1
2# Python module to read erlang ext (term) format
3#
4# written by Attila Tajti on 2003, for
5#
6# TODO: reads compressed data only
7
8import os, sys, struct, zlib, cStringIO
9
10class erlang_atom:
11
12	def __init__(self, atom):
13		self.atom = atom
14
15	def __str__(self):
16		return self.atom
17
18	def __eq__(self, other):
19		return self.atom == other.atom
20
21	def __ne__(self, other):
22		return self.atom != other.atom
23
24	def __repr__(self):
25		return "atom <%s>" % self.atom
26
27class erlang_ext_reader:
28
29	def __init__(self, filename):
30		file = open(filename, "rb")
31		header = file.read(15)
32		fsize, = struct.unpack(">L",  file.read(4))   # file_size - 19
33		misc,  = struct.unpack(">H",  file.read(2))
34		dsize, = struct.unpack(">L",  file.read(4))   # uncompressed data size
35		data   = file.read(fsize-6)
36		file.close()
37
38		data = zlib.decompress(data)
39		if dsize != len(data): print "ERROR: uncompressed size does not match."
40		self.data = cStringIO.StringIO(data)
41
42		self.logstr = ""
43		self.depth = 0
44		self.datalog = ""
45		self.logging = 0
46
47	def log_str(self, str):
48		pass
49		#self.logstr += str + "\n"
50
51	def log_data(self, str):
52		self.datalog += "  " * self.depth + str + "\n"
53
54	def log_begin_block(self):
55		self.datalog += "  " * self.depth + "{\n"
56		self.depth += 1
57
58	def log_end_block(self):
59		self.depth -= 1
60		if self.depth < 0: raise "hell"
61		self.datalog += "  " * self.depth + "}\n"
62
63	def read_small_int(self):
64		val, = struct.unpack(">B", self.data.read(1))
65		if self.logging:
66			self.log_str("small_int: %d" % (val))
67			self.log_data(str(val))
68		return val
69
70	def read_int(self):
71		val, = struct.unpack(">l", self.data.read(4))
72		if self.logging:
73			self.log_str("int: %d\n" % (val))
74			self.log_data(str(val))
75		return val
76
77	def read_float(self):
78		buf = self.data.read(31)
79		chrs = filter(lambda char: ord(char) > 0, buf)
80		val = float(chrs)
81		if self.logging:
82			self.log_str("float: %f\n" % (val))
83			self.log_data(str(val))
84		return val
85
86	def read_atom(self):
87		namelen, = struct.unpack(">H", self.data.read(2))
88		name = self.data.read(namelen)
89		if self.logging:
90			self.log_str("atom: %d %s" % (namelen, name))
91			self.log_data("ATOM %s" % name)
92		return erlang_atom(name)
93
94	def	read_tuple(self, len):
95		if self.logging:
96			self.log_data("TUPLE [%d]" % len)
97			self.log_begin_block()
98		val = []
99		for i in range(len):
100			val.append(self.read_element())
101		if self.logging:
102			self.log_end_block()
103		return tuple(val)
104
105	def read_small_tuple(self):
106		len, = struct.unpack(">B", self.data.read(1))
107		if self.logging:
108			self.log_str("small_tuple: %d" % (len))
109		return self.read_tuple(len)
110
111	def read_large_tuple(self):
112		len, = struct.unpack(">L", self.data.read(4))
113		if self.logging:
114			self.log_str("large_tuple: %d" % (len))
115		return self.read_tuple(len)
116
117	def read_listx(self):
118		len, = struct.unpack(">L", self.data.read(4))
119		if self.logging:
120			self.log_str("list: %d" % len)
121			self.log_data("LIST [%d]" % len)
122			self.log_begin_block()
123		val = []
124		elem = 1
125		while elem != None:
126			elem = self.read_element()
127			val.append(elem)
128		if self.logging:
129			self.log_end_block()
130		return val
131
132	def read_list(self):
133		len, = struct.unpack(">L", self.data.read(4))
134		if self.logging:
135			self.log_str("list: %d" % len)
136			self.log_data("LIST [%d]" % len)
137			self.log_begin_block()
138		val = []
139		for i in range(len):
140			#if self.depth == 5: self.log_str(str(i))
141			elem = self.read_element()
142			val.append(elem)
143		elem = self.read_element()
144		if elem != None: raise "hey!"
145		if self.logging:
146			self.log_end_block()
147		return val
148
149	def read_string(self):
150		namelen, = struct.unpack(">H", self.data.read(2))
151		name = self.data.read(namelen)
152		if self.logging:
153			self.log_str("string: %d %s" % (namelen, name))
154			self.log_data('STRING %s' % repr(name))
155		return name
156
157	def read_binary(self):
158		len, = struct.unpack(">L", self.data.read(4))
159		data = self.data.read(len)
160		if self.logging:
161			def hexchar(x):
162				return hex(ord(x))[2:]
163			repr = "".join(map(hexchar, data))
164			self.log_str("binary: %d %s" % (len, repr))
165			self.log_data('BINARY [%d] 0x%s' % (len, repr))
166		return data
167
168	def read_nil(self):
169		if self.logging:
170			self.log_data('NIL')
171		return None
172
173	def read_element(self):
174		id, = struct.unpack(">B", self.data.read(1))
175
176		return self.read_element_using_id(id)
177
178	def read_element_using_id(self, id):
179		#if self.depth == 5: self.log_str("read element %d" % id)
180
181		if id == 97:
182			return self.read_small_int()
183
184		elif id == 98:
185			return self.read_int()
186
187		elif id == 99:
188			return self.read_float()
189
190		elif id == 100:
191			return self.read_atom()
192
193		elif id == 104:
194			return self.read_small_tuple()
195
196		elif id == 105:
197			return self.read_large_tuple()
198
199		elif id == 106:
200			return self.read_nil()
201
202		elif id == 107:
203			return self.read_string()
204
205		elif id == 108:
206			return self.read_list()
207
208		elif id == 109:
209			return self.read_binary()
210
211		else:
212			raise "problem " + str(id)
213
214	def read(self):
215		return self.read_element()
216
217	def readtest(self):
218		self.read_element()
219
220		#run = 1
221		#while run:
222			#run = self.read_element()
223
224def test():
225	e = erlang_ext_reader("tank1w.wings")
226	try:
227		data = e.read_element()
228	finally:
229		f = open("log.txt", "w")
230		f.write(e.datalog)
231		f.write(e.logstr)
232		f.write(repr(data))
233
234#test()
235