1#! /usr/bin/env python
2# encoding: utf-8
3# Thomas Nagy, 2019 (ita)
4
5"""
6Filesystem-based cache system to share and re-use build artifacts
7
8Cache access operations (copy to and from) are delegated to
9independent pre-forked worker subprocesses.
10
11The following environment variables may be set:
12* WAFCACHE: several possibilities:
13  - File cache:
14    absolute path of the waf cache (~/.cache/wafcache_user,
15    where `user` represents the currently logged-in user)
16  - URL to a cache server, for example:
17    export WAFCACHE=http://localhost:8080/files/
18    in that case, GET/POST requests are made to urls of the form
19    http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20  - GCS, S3 or MINIO bucket
21    gs://my-bucket/    (uses gsutil command line tool or WAFCACHE_CMD)
22    s3://my-bucket/    (uses aws command line tool or WAFCACHE_CMD)
23    minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24* WAFCACHE_CMD: bucket upload/download command, for example:
25    WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26  Note that the WAFCACHE bucket value is used for the source or destination
27  depending on the operation (upload or download). For example, with:
28    WAFCACHE="gs://mybucket/"
29  the following commands may be run:
30    gsutil cp build/myprogram  gs://mybucket/aa/aaaaa/1
31    gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32* WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33* WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34
35File cache specific options:
36  Files are copied using hard links by default; if the cache is located
37  onto another partition, the system switches to file copies instead.
38* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
39* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
40* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
41                                   and trim the cache (3 minutess)
42
43Usage::
44
45	def build(bld):
46		bld.load('wafcache')
47		...
48
49To troubleshoot::
50
51	waf clean build --zones=wafcache
52"""
53
54import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex
55try:
56	import subprocess32 as subprocess
57except ImportError:
58	import subprocess
59
60base_cache = os.path.expanduser('~/.cache/')
61if not os.path.isdir(base_cache):
62	base_cache = '/tmp/'
63default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
64
65CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
66WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
67TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
68EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
69EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
70WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
71WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
72OK = "ok"
73
74re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
75
76try:
77	import cPickle
78except ImportError:
79	import pickle as cPickle
80
81if __name__ != '__main__':
82	from waflib import Task, Logs, Utils, Build
83
84def can_retrieve_cache(self):
85	"""
86	New method for waf Task classes
87	"""
88	if not self.outputs:
89		return False
90
91	self.cached = False
92
93	sig = self.signature()
94	ssig = Utils.to_hex(self.uid() + sig)
95
96	files_to = [node.abspath() for node in self.outputs]
97	err = cache_command(ssig, [], files_to)
98	if err.startswith(OK):
99		if WAFCACHE_VERBOSITY:
100			Logs.pprint('CYAN', '  Fetched %r from cache' % files_to)
101		else:
102			Logs.debug('wafcache: fetched %r from cache', files_to)
103	else:
104		if WAFCACHE_VERBOSITY:
105			Logs.pprint('YELLOW', '  No cache entry %s' % files_to)
106		else:
107			Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
108		return False
109
110	self.cached = True
111	return True
112
113def put_files_cache(self):
114	"""
115	New method for waf Task classes
116	"""
117	if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
118		return
119
120	bld = self.generator.bld
121	sig = self.signature()
122	ssig = Utils.to_hex(self.uid() + sig)
123
124	files_from = [node.abspath() for node in self.outputs]
125	err = cache_command(ssig, files_from, [])
126
127	if err.startswith(OK):
128		if WAFCACHE_VERBOSITY:
129			Logs.pprint('CYAN', '  Successfully uploaded %s to cache' % files_from)
130		else:
131			Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
132	else:
133		if WAFCACHE_VERBOSITY:
134			Logs.pprint('RED', '  Error caching step results %s: %s' % (files_from, err))
135		else:
136			Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
137
138	bld.task_sigs[self.uid()] = self.cache_sig
139
140def hash_env_vars(self, env, vars_lst):
141	"""
142	Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
143	"""
144	if not env.table:
145		env = env.parent
146		if not env:
147			return Utils.SIG_NIL
148
149	idx = str(id(env)) + str(vars_lst)
150	try:
151		cache = self.cache_env
152	except AttributeError:
153		cache = self.cache_env = {}
154	else:
155		try:
156			return self.cache_env[idx]
157		except KeyError:
158			pass
159
160	v = str([env[a] for a in vars_lst])
161	v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
162	m = Utils.md5()
163	m.update(v.encode())
164	ret = m.digest()
165
166	Logs.debug('envhash: %r %r', ret, v)
167
168	cache[idx] = ret
169
170	return ret
171
172def uid(self):
173	"""
174	Reimplement Task.uid() so that the signature does not depend on local paths
175	"""
176	try:
177		return self.uid_
178	except AttributeError:
179		m = Utils.md5()
180		src = self.generator.bld.srcnode
181		up = m.update
182		up(self.__class__.__name__.encode())
183		for x in self.inputs + self.outputs:
184			up(x.path_from(src).encode())
185		self.uid_ = m.digest()
186		return self.uid_
187
188
189def make_cached(cls):
190	"""
191	Enable the waf cache for a given task class
192	"""
193	if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
194		return
195
196	m1 = getattr(cls, 'run', None)
197	def run(self):
198		if getattr(self, 'nocache', False):
199			return m1(self)
200		if self.can_retrieve_cache():
201			return 0
202		return m1(self)
203	cls.run = run
204
205	m2 = getattr(cls, 'post_run', None)
206	def post_run(self):
207		if getattr(self, 'nocache', False):
208			return m2(self)
209		ret = m2(self)
210		self.put_files_cache()
211		if hasattr(self, 'chmod'):
212			for node in self.outputs:
213				os.chmod(node.abspath(), self.chmod)
214		return ret
215	cls.post_run = post_run
216	cls.has_cache = True
217
218process_pool = []
219def get_process():
220	"""
221	Returns a worker process that can process waf cache commands
222	The worker process is assumed to be returned to the process pool when unused
223	"""
224	try:
225		return process_pool.pop()
226	except IndexError:
227		filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
228		cmd = [sys.executable, '-c', Utils.readf(filepath)]
229		return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
230
231def atexit_pool():
232	for k in process_pool:
233		try:
234			os.kill(k.pid, 9)
235		except OSError:
236			pass
237		else:
238			k.wait()
239atexit.register(atexit_pool)
240
241def build(bld):
242	"""
243	Called during the build process to enable file caching
244	"""
245	if process_pool:
246		# already called once
247		return
248
249	# pre-allocation
250	processes = [get_process() for x in range(bld.jobs)]
251	process_pool.extend(processes)
252
253	Task.Task.can_retrieve_cache = can_retrieve_cache
254	Task.Task.put_files_cache = put_files_cache
255	Task.Task.uid = uid
256	Build.BuildContext.hash_env_vars = hash_env_vars
257	for x in reversed(list(Task.classes.values())):
258		make_cached(x)
259
260def cache_command(sig, files_from, files_to):
261	"""
262	Create a command for cache worker processes, returns a pickled
263	base64-encoded tuple containing the task signature, a list of files to
264	cache and a list of files files to get from cache (one of the lists
265	is assumed to be empty)
266	"""
267	proc = get_process()
268
269	obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
270	proc.stdin.write(obj)
271	proc.stdin.write('\n'.encode())
272	proc.stdin.flush()
273	obj = proc.stdout.readline()
274	if not obj:
275		raise OSError('Preforked sub-process %r died' % proc.pid)
276	process_pool.append(proc)
277	return cPickle.loads(base64.b64decode(obj))
278
279try:
280	copyfun = os.link
281except NameError:
282	copyfun = shutil.copy2
283
284def atomic_copy(orig, dest):
285	"""
286	Copy files to the cache, the operation is atomic for a given file
287	"""
288	global copyfun
289	tmp = dest + '.tmp'
290	up = os.path.dirname(dest)
291	try:
292		os.makedirs(up)
293	except OSError:
294		pass
295
296	try:
297		copyfun(orig, tmp)
298	except OSError as e:
299		if e.errno == errno.EXDEV:
300			copyfun = shutil.copy2
301			copyfun(orig, tmp)
302		else:
303			raise
304	os.rename(tmp, dest)
305
306def lru_trim():
307	"""
308	the cache folders take the form:
309	`CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
310	they are listed in order of last access, and then removed
311	until the amount of folders is within TRIM_MAX_FOLDERS and the total space
312	taken by files is less than EVICT_MAX_BYTES
313	"""
314	lst = []
315	for up in os.listdir(CACHE_DIR):
316		if len(up) == 2:
317			sub = os.path.join(CACHE_DIR, up)
318			for hval in os.listdir(sub):
319				path = os.path.join(sub, hval)
320
321				size = 0
322				for fname in os.listdir(path):
323					size += os.lstat(os.path.join(path, fname)).st_size
324				lst.append((os.stat(path).st_mtime, size, path))
325
326	lst.sort(key=lambda x: x[0])
327	lst.reverse()
328
329	tot = sum(x[1] for x in lst)
330	while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
331		_, tmp_size, path = lst.pop()
332		tot -= tmp_size
333
334		tmp = path + '.tmp'
335		try:
336			shutil.rmtree(tmp)
337		except OSError:
338			pass
339		try:
340			os.rename(path, tmp)
341		except OSError:
342			sys.stderr.write('Could not rename %r to %r' % (path, tmp))
343		else:
344			try:
345				shutil.rmtree(tmp)
346			except OSError:
347				sys.stderr.write('Could not remove %r' % tmp)
348	sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
349
350
351def lru_evict():
352	"""
353	Reduce the cache size
354	"""
355	lockfile = os.path.join(CACHE_DIR, 'all.lock')
356	try:
357		st = os.stat(lockfile)
358	except EnvironmentError as e:
359		if e.errno == errno.ENOENT:
360			with open(lockfile, 'w') as f:
361				f.write('')
362			return
363		else:
364			raise
365
366	if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
367		# check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
368		# OCLOEXEC is unnecessary because no processes are spawned
369		fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
370		try:
371			try:
372				fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
373			except EnvironmentError:
374				sys.stderr.write('another process is running!\n')
375				pass
376			else:
377				# now dow the actual cleanup
378				lru_trim()
379				os.utime(lockfile, None)
380		finally:
381			os.close(fd)
382
383class netcache(object):
384	def __init__(self):
385		self.http = urllib3.PoolManager()
386
387	def url_of(self, sig, i):
388		return "%s/%s/%s" % (CACHE_DIR, sig, i)
389
390	def upload(self, file_path, sig, i):
391		url = self.url_of(sig, i)
392		with open(file_path, 'rb') as f:
393			file_data = f.read()
394		r = self.http.request('POST', url, timeout=60,
395			fields={ 'file': ('%s/%s' % (sig, i), file_data), })
396		if r.status >= 400:
397			raise OSError("Invalid status %r %r" % (url, r.status))
398
399	def download(self, file_path, sig, i):
400		url = self.url_of(sig, i)
401		with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
402			if inf.status >= 400:
403				raise OSError("Invalid status %r %r" % (url, inf.status))
404			with open(file_path, 'wb') as out:
405				shutil.copyfileobj(inf, out)
406
407	def copy_to_cache(self, sig, files_from, files_to):
408		try:
409			for i, x in enumerate(files_from):
410				if not os.path.islink(x):
411					self.upload(x, sig, i)
412		except Exception:
413			return traceback.format_exc()
414		return OK
415
416	def copy_from_cache(self, sig, files_from, files_to):
417		try:
418			for i, x in enumerate(files_to):
419				self.download(x, sig, i)
420		except Exception:
421			return traceback.format_exc()
422		return OK
423
424class fcache(object):
425	def __init__(self):
426		if not os.path.exists(CACHE_DIR):
427			os.makedirs(CACHE_DIR)
428		if not os.path.exists(CACHE_DIR):
429			raise ValueError('Could not initialize the cache directory')
430
431	def copy_to_cache(self, sig, files_from, files_to):
432		"""
433		Copy files to the cache, existing files are overwritten,
434		and the copy is atomic only for a given file, not for all files
435		that belong to a given task object
436		"""
437		try:
438			for i, x in enumerate(files_from):
439				dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
440				atomic_copy(x, dest)
441		except Exception:
442			return traceback.format_exc()
443		else:
444			# attempt trimming if caching was successful:
445			# we may have things to trim!
446			lru_evict()
447		return OK
448
449	def copy_from_cache(self, sig, files_from, files_to):
450		"""
451		Copy files from the cache
452		"""
453		try:
454			for i, x in enumerate(files_to):
455				orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
456				atomic_copy(orig, x)
457
458			# success! update the cache time
459			os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
460		except Exception:
461			return traceback.format_exc()
462		return OK
463
464class bucket_cache(object):
465	def bucket_copy(self, source, target):
466		if WAFCACHE_CMD:
467			def replacer(match):
468				if match.group('src'):
469					return source
470				elif match.group('tgt'):
471					return target
472			cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
473		elif CACHE_DIR.startswith('s3://'):
474			cmd = ['aws', 's3', 'cp', source, target]
475		elif CACHE_DIR.startswith('gs://'):
476			cmd = ['gsutil', 'cp', source, target]
477		else:
478			cmd = ['mc', 'cp', source, target]
479
480		proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
481		out, err = proc.communicate()
482		if proc.returncode:
483			raise OSError('Error copy %r to %r using: %r (exit %r):\n  out:%s\n  err:%s' % (
484				source, target, cmd, proc.returncode, out.decode(), err.decode()))
485
486	def copy_to_cache(self, sig, files_from, files_to):
487		try:
488			for i, x in enumerate(files_from):
489				dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
490				self.bucket_copy(x, dest)
491		except Exception:
492			return traceback.format_exc()
493		return OK
494
495	def copy_from_cache(self, sig, files_from, files_to):
496		try:
497			for i, x in enumerate(files_to):
498				orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
499				self.bucket_copy(orig, x)
500		except EnvironmentError:
501			return traceback.format_exc()
502		return OK
503
504def loop(service):
505	"""
506	This function is run when this file is run as a standalone python script,
507	it assumes a parent process that will communicate the commands to it
508	as pickled-encoded tuples (one line per command)
509
510	The commands are to copy files to the cache or copy files from the
511	cache to a target destination
512	"""
513	# one operation is performed at a single time by a single process
514	# therefore stdin never has more than one line
515	txt = sys.stdin.readline().strip()
516	if not txt:
517		# parent process probably ended
518		sys.exit(1)
519	ret = OK
520
521	[sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
522	if files_from:
523		# TODO return early when pushing files upstream
524		ret = service.copy_to_cache(sig, files_from, files_to)
525	elif files_to:
526		# the build process waits for workers to (possibly) obtain files from the cache
527		ret = service.copy_from_cache(sig, files_from, files_to)
528	else:
529		ret = "Invalid command"
530
531	obj = base64.b64encode(cPickle.dumps(ret))
532	sys.stdout.write(obj.decode())
533	sys.stdout.write('\n')
534	sys.stdout.flush()
535
536if __name__ == '__main__':
537	if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
538		if CACHE_DIR.startswith('minio://'):
539			CACHE_DIR = CACHE_DIR[8:]   # minio doesn't need the protocol part, uses config aliases
540		service = bucket_cache()
541	elif CACHE_DIR.startswith('http'):
542		service = netcache()
543	else:
544		service = fcache()
545	while 1:
546		try:
547			loop(service)
548		except KeyboardInterrupt:
549			break
550
551