1#! /usr/bin/env python
2# encoding: utf-8
3# Thomas Nagy, 2011-2015 (ita)
4
5"""
6A client for the network cache (playground/netcache/). Launch the server with:
7./netcache_server, then use it for the builds by adding the following:
8
9	def build(bld):
10		bld.load('netcache_client')
11
12The parameters should be present in the environment in the form:
13	NETCACHE=host:port waf configure build
14
15Or in a more detailed way:
16	NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
17
18where:
19	host: host where the server resides, by default localhost
20	port: by default push on 11001 and pull on 12001
21
22Use the server provided in playground/netcache/Netcache.java
23"""
24
25import os, socket, time, atexit, sys
26from waflib import Task, Logs, Utils, Build, Runner
27from waflib.Configure import conf
28
29BUF = 8192 * 16
30HEADER_SIZE = 128
31MODES = ['PUSH', 'PULL', 'PUSH_PULL']
32STALE_TIME = 30 # seconds
33
34GET = 'GET'
35PUT = 'PUT'
36LST = 'LST'
37BYE = 'BYE'
38
39all_sigs_in_cache = (0.0, [])
40
41def put_data(conn, data):
42	if sys.hexversion > 0x3000000:
43		data = data.encode('latin-1')
44	cnt = 0
45	while cnt < len(data):
46		sent = conn.send(data[cnt:])
47		if sent == 0:
48			raise RuntimeError('connection ended')
49		cnt += sent
50
51push_connections = Runner.Queue(0)
52pull_connections = Runner.Queue(0)
53def get_connection(push=False):
54	# return a new connection... do not forget to release it!
55	try:
56		if push:
57			ret = push_connections.get(block=False)
58		else:
59			ret = pull_connections.get(block=False)
60	except Exception:
61		ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
62		if push:
63			ret.connect(Task.push_addr)
64		else:
65			ret.connect(Task.pull_addr)
66	return ret
67
68def release_connection(conn, msg='', push=False):
69	if conn:
70		if push:
71			push_connections.put(conn)
72		else:
73			pull_connections.put(conn)
74
75def close_connection(conn, msg=''):
76	if conn:
77		data = '%s,%s' % (BYE, msg)
78		try:
79			put_data(conn, data.ljust(HEADER_SIZE))
80		except:
81			pass
82		try:
83			conn.close()
84		except:
85			pass
86
87def close_all():
88	for q in (push_connections, pull_connections):
89		while q.qsize():
90			conn = q.get()
91			try:
92				close_connection(conn)
93			except:
94				# ignore errors when cleaning up
95				pass
96atexit.register(close_all)
97
98def read_header(conn):
99	cnt = 0
100	buf = []
101	while cnt < HEADER_SIZE:
102		data = conn.recv(HEADER_SIZE - cnt)
103		if not data:
104			#import traceback
105			#traceback.print_stack()
106			raise ValueError('connection ended when reading a header %r' % buf)
107		buf.append(data)
108		cnt += len(data)
109	if sys.hexversion > 0x3000000:
110		ret = ''.encode('latin-1').join(buf)
111		ret = ret.decode('latin-1')
112	else:
113		ret = ''.join(buf)
114	return ret
115
116def check_cache(conn, ssig):
117	"""
118	List the files on the server, this is an optimization because it assumes that
119	concurrent builds are rare
120	"""
121	global all_sigs_in_cache
122	if not STALE_TIME:
123		return
124	if time.time() - all_sigs_in_cache[0] > STALE_TIME:
125
126		params = (LST,'')
127		put_data(conn, ','.join(params).ljust(HEADER_SIZE))
128
129		# read what is coming back
130		ret = read_header(conn)
131		size = int(ret.split(',')[0])
132
133		buf = []
134		cnt = 0
135		while cnt < size:
136			data = conn.recv(min(BUF, size-cnt))
137			if not data:
138				raise ValueError('connection ended %r %r' % (cnt, size))
139			buf.append(data)
140			cnt += len(data)
141
142		if sys.hexversion > 0x3000000:
143			ret = ''.encode('latin-1').join(buf)
144			ret = ret.decode('latin-1')
145		else:
146			ret = ''.join(buf)
147
148		all_sigs_in_cache = (time.time(), ret.splitlines())
149		Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1]))
150
151	if not ssig in all_sigs_in_cache[1]:
152		raise ValueError('no file %s in cache' % ssig)
153
154class MissingFile(Exception):
155	pass
156
157def recv_file(conn, ssig, count, p):
158	check_cache(conn, ssig)
159
160	params = (GET, ssig, str(count))
161	put_data(conn, ','.join(params).ljust(HEADER_SIZE))
162	data = read_header(conn)
163
164	size = int(data.split(',')[0])
165
166	if size == -1:
167		raise MissingFile('no file %s - %s in cache' % (ssig, count))
168
169	# get the file, writing immediately
170	# TODO a tmp file would be better
171	f = open(p, 'wb')
172	cnt = 0
173	while cnt < size:
174		data = conn.recv(min(BUF, size-cnt))
175		if not data:
176			raise ValueError('connection ended %r %r' % (cnt, size))
177		f.write(data)
178		cnt += len(data)
179	f.close()
180
181def sock_send(conn, ssig, cnt, p):
182	#print "pushing %r %r %r" % (ssig, cnt, p)
183	size = os.stat(p).st_size
184	params = (PUT, ssig, str(cnt), str(size))
185	put_data(conn, ','.join(params).ljust(HEADER_SIZE))
186	f = open(p, 'rb')
187	cnt = 0
188	while cnt < size:
189		r = f.read(min(BUF, size-cnt))
190		while r:
191			k = conn.send(r)
192			if not k:
193				raise ValueError('connection ended')
194			cnt += k
195			r = r[k:]
196
197def can_retrieve_cache(self):
198	if not Task.pull_addr:
199		return False
200	if not self.outputs:
201		return False
202	self.cached = False
203
204	cnt = 0
205	sig = self.signature()
206	ssig = Utils.to_hex(self.uid() + sig)
207
208	conn = None
209	err = False
210	try:
211		try:
212			conn = get_connection()
213			for node in self.outputs:
214				p = node.abspath()
215				recv_file(conn, ssig, cnt, p)
216				cnt += 1
217		except MissingFile as e:
218			Logs.debug('netcache: file is not in the cache %r', e)
219			err = True
220		except Exception as e:
221			Logs.debug('netcache: could not get the files %r', self.outputs)
222			if Logs.verbose > 1:
223				Logs.debug('netcache: exception %r', e)
224			err = True
225
226			# broken connection? remove this one
227			close_connection(conn)
228			conn = None
229		else:
230			Logs.debug('netcache: obtained %r from cache', self.outputs)
231
232	finally:
233		release_connection(conn)
234	if err:
235		return False
236
237	self.cached = True
238	return True
239
240@Utils.run_once
241def put_files_cache(self):
242	if not Task.push_addr:
243		return
244	if not self.outputs:
245		return
246	if getattr(self, 'cached', None):
247		return
248
249	#print "called put_files_cache", id(self)
250	bld = self.generator.bld
251	sig = self.signature()
252	ssig = Utils.to_hex(self.uid() + sig)
253
254	conn = None
255	cnt = 0
256	try:
257		for node in self.outputs:
258			# We could re-create the signature of the task with the signature of the outputs
259			# in practice, this means hashing the output files
260			# this is unnecessary
261			try:
262				if not conn:
263					conn = get_connection(push=True)
264				sock_send(conn, ssig, cnt, node.abspath())
265				Logs.debug('netcache: sent %r', node)
266			except Exception as e:
267				Logs.debug('netcache: could not push the files %r', e)
268
269				# broken connection? remove this one
270				close_connection(conn)
271				conn = None
272			cnt += 1
273	finally:
274		release_connection(conn, push=True)
275
276	bld.task_sigs[self.uid()] = self.cache_sig
277
278def hash_env_vars(self, env, vars_lst):
279	# reimplement so that the resulting hash does not depend on local paths
280	if not env.table:
281		env = env.parent
282		if not env:
283			return Utils.SIG_NIL
284
285	idx = str(id(env)) + str(vars_lst)
286	try:
287		cache = self.cache_env
288	except AttributeError:
289		cache = self.cache_env = {}
290	else:
291		try:
292			return self.cache_env[idx]
293		except KeyError:
294			pass
295
296	v = str([env[a] for a in vars_lst])
297	v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
298	m = Utils.md5()
299	m.update(v.encode())
300	ret = m.digest()
301
302	Logs.debug('envhash: %r %r', ret, v)
303
304	cache[idx] = ret
305
306	return ret
307
308def uid(self):
309	# reimplement so that the signature does not depend on local paths
310	try:
311		return self.uid_
312	except AttributeError:
313		m = Utils.md5()
314		src = self.generator.bld.srcnode
315		up = m.update
316		up(self.__class__.__name__.encode())
317		for x in self.inputs + self.outputs:
318			up(x.path_from(src).encode())
319		self.uid_ = m.digest()
320		return self.uid_
321
322
323def make_cached(cls):
324	if getattr(cls, 'nocache', None):
325		return
326
327	m1 = cls.run
328	def run(self):
329		if getattr(self, 'nocache', False):
330			return m1(self)
331		if self.can_retrieve_cache():
332			return 0
333		return m1(self)
334	cls.run = run
335
336	m2 = cls.post_run
337	def post_run(self):
338		if getattr(self, 'nocache', False):
339			return m2(self)
340		bld = self.generator.bld
341		ret = m2(self)
342		if bld.cache_global:
343			self.put_files_cache()
344		if hasattr(self, 'chmod'):
345			for node in self.outputs:
346				os.chmod(node.abspath(), self.chmod)
347		return ret
348	cls.post_run = post_run
349
350@conf
351def setup_netcache(ctx, push_addr, pull_addr):
352	Task.Task.can_retrieve_cache = can_retrieve_cache
353	Task.Task.put_files_cache = put_files_cache
354	Task.Task.uid = uid
355	Task.push_addr = push_addr
356	Task.pull_addr = pull_addr
357	Build.BuildContext.hash_env_vars = hash_env_vars
358	ctx.cache_global = True
359
360	for x in Task.classes.values():
361		make_cached(x)
362
363def build(bld):
364	if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
365		Logs.warn('Setting  NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
366		os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
367		os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
368
369	if 'NETCACHE' in os.environ:
370		if not 'NETCACHE_PUSH' in os.environ:
371			os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
372		if not 'NETCACHE_PULL' in os.environ:
373			os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
374
375	v = os.environ['NETCACHE_PULL']
376	if v:
377		h, p = v.split(':')
378		pull_addr = (h, int(p))
379	else:
380		pull_addr = None
381
382	v = os.environ['NETCACHE_PUSH']
383	if v:
384		h, p = v.split(':')
385		push_addr = (h, int(p))
386	else:
387		push_addr = None
388
389	setup_netcache(bld, push_addr, pull_addr)
390
391