1#! /usr/bin/env python
2# encoding: utf-8
3# Thomas Nagy, 2019 (ita)
4
5"""
6Filesystem-based cache system to share and re-use build artifacts
7
8The following environment variables may be set:
9* WAFCACHE: absolute path of the waf cache (/tmp/wafcache_user,
10            where `user` represents the currently logged-in user)
11* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
12* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
13* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
14                                   and trim the cache (3 minutess)
15
16Cache access operations (copy to and from) are delegated to pre-forked
17subprocesses. Though these processes perform atomic copies, they
18are unaware of other processes running on the system
19
20The files are copied using hard links by default; if the cache is located
21onto another partition, the system switches to file copies instead.
22
23Usage::
24
25	def build(bld):
26		bld.load('wafcache')
27		...
28"""
29
30import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time
31try:
32	import subprocess32 as subprocess
33except ImportError:
34	import subprocess
35
36CACHE_DIR = os.environ.get('WAFCACHE', '/tmp/wafcache_' + getpass.getuser())
37TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
38EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
39EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
40OK = "ok"
41
42try:
43	import cPickle
44except ImportError:
45	import pickle as cPickle
46
47if __name__ != '__main__':
48	from waflib import Task, Logs, Utils, Build
49
50def can_retrieve_cache(self):
51	"""
52	New method for waf Task classes
53	"""
54	if not self.outputs:
55		return False
56
57	self.cached = False
58
59	sig = self.signature()
60	ssig = Utils.to_hex(self.uid() + sig)
61
62	files_to = [node.abspath() for node in self.outputs]
63	err = cache_command(ssig, [], files_to)
64	if not err.startswith(OK):
65		if Logs.verbose:
66			Logs.debug('wafcache: error getting from cache %s', err)
67		return False
68
69	self.cached = True
70	return True
71
72def put_files_cache(self):
73	"""
74	New method for waf Task classes
75	"""
76	if not self.outputs:
77		return
78
79	if getattr(self, 'cached', None):
80		return
81
82	bld = self.generator.bld
83	sig = self.signature()
84	ssig = Utils.to_hex(self.uid() + sig)
85
86	files_from = [node.abspath() for node in self.outputs]
87	err = cache_command(ssig, files_from, [])
88
89	if not err.startswith(OK):
90		if Logs.verbose:
91			Logs.debug('wafcache: error caching %s', err)
92
93	bld.task_sigs[self.uid()] = self.cache_sig
94
95def hash_env_vars(self, env, vars_lst):
96	"""
97	Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
98	"""
99	if not env.table:
100		env = env.parent
101		if not env:
102			return Utils.SIG_NIL
103
104	idx = str(id(env)) + str(vars_lst)
105	try:
106		cache = self.cache_env
107	except AttributeError:
108		cache = self.cache_env = {}
109	else:
110		try:
111			return self.cache_env[idx]
112		except KeyError:
113			pass
114
115	v = str([env[a] for a in vars_lst])
116	v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
117	m = Utils.md5()
118	m.update(v.encode())
119	ret = m.digest()
120
121	Logs.debug('envhash: %r %r', ret, v)
122
123	cache[idx] = ret
124
125	return ret
126
127def uid(self):
128	"""
129	Reimplement Task.uid() so that the signature does not depend on local paths
130	"""
131	try:
132		return self.uid_
133	except AttributeError:
134		m = Utils.md5()
135		src = self.generator.bld.srcnode
136		up = m.update
137		up(self.__class__.__name__.encode())
138		for x in self.inputs + self.outputs:
139			up(x.path_from(src).encode())
140		self.uid_ = m.digest()
141		return self.uid_
142
143
144def make_cached(cls):
145	"""
146	Enable the waf cache for a given task class
147	"""
148	if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
149		return
150
151	m1 = getattr(cls, 'run', None)
152	def run(self):
153		if getattr(self, 'nocache', False):
154			return m1(self)
155		if self.can_retrieve_cache():
156			return 0
157		return m1(self)
158	cls.run = run
159
160	m2 = getattr(cls, 'post_run', None)
161	def post_run(self):
162		if getattr(self, 'nocache', False):
163			return m2(self)
164		ret = m2(self)
165		self.put_files_cache()
166		if hasattr(self, 'chmod'):
167			for node in self.outputs:
168				os.chmod(node.abspath(), self.chmod)
169		return ret
170	cls.post_run = post_run
171	cls.has_cache = True
172
173process_pool = []
174def get_process():
175	"""
176	Returns a worker process that can process waf cache commands
177	The worker process is assumed to be returned to the process pool when unused
178	"""
179	try:
180		return process_pool.pop()
181	except IndexError:
182		filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
183		cmd = [sys.executable, '-c', Utils.readf(filepath)]
184		return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
185
186def atexit_pool():
187	for k in process_pool:
188		try:
189			os.kill(k.pid, 9)
190		except OSError:
191			pass
192		else:
193			k.wait()
194atexit.register(atexit_pool)
195
196def build(bld):
197	"""
198	Called during the build process to enable file caching
199	"""
200	if process_pool:
201		# already called once
202		return
203
204	for x in range(bld.jobs):
205		process_pool.append(get_process())
206
207	Task.Task.can_retrieve_cache = can_retrieve_cache
208	Task.Task.put_files_cache = put_files_cache
209	Task.Task.uid = uid
210	Build.BuildContext.hash_env_vars = hash_env_vars
211	for x in Task.classes.values():
212		make_cached(x)
213
214def cache_command(sig, files_from, files_to):
215	"""
216	Create a command for cache worker processes, returns a pickled
217	base64-encoded tuple containing the task signature, a list of files to
218	cache and a list of files files to get from cache (one of the lists
219	is assumed to be empty)
220	"""
221	proc = get_process()
222
223	obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
224	proc.stdin.write(obj)
225	proc.stdin.write('\n'.encode())
226	proc.stdin.flush()
227	obj = proc.stdout.readline()
228	if not obj:
229		raise OSError('Preforked sub-process %r died' % proc.pid)
230	process_pool.append(proc)
231	return cPickle.loads(base64.b64decode(obj))
232
233try:
234	copyfun = os.link
235except NameError:
236	copyfun = shutil.copy2
237
238def atomic_copy(orig, dest):
239	"""
240	Copy files to the cache, the operation is atomic for a given file
241	"""
242	global copyfun
243	tmp = dest + '.tmp'
244	up = os.path.dirname(dest)
245	try:
246		os.makedirs(up)
247	except OSError:
248		pass
249
250	try:
251		copyfun(orig, tmp)
252	except OSError as e:
253		if e.errno == errno.EXDEV:
254			copyfun = shutil.copy2
255			copyfun(orig, tmp)
256		else:
257			raise
258	os.rename(tmp, dest)
259
260def lru_trim():
261	"""
262	the cache folders take the form:
263	`CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
264	they are listed in order of last access, and then removed
265	until the amount of folders is within TRIM_MAX_FOLDERS and the total space
266	taken by files is less than EVICT_MAX_BYTES
267	"""
268	lst = []
269	for up in os.listdir(CACHE_DIR):
270		if len(up) == 2:
271			sub = os.path.join(CACHE_DIR, up)
272			for hval in os.listdir(sub):
273				path = os.path.join(sub, hval)
274
275				size = 0
276				for fname in os.listdir(path):
277					size += os.lstat(os.path.join(path, fname)).st_size
278				lst.append((os.stat(path).st_mtime, size, path))
279
280	lst.sort(key=lambda x: x[0])
281	lst.reverse()
282
283	tot = sum(x[1] for x in lst)
284	while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
285		_, tmp_size, path = lst.pop()
286		tot -= tmp_size
287
288		tmp = path + '.tmp'
289		try:
290			shutil.rmtree(tmp)
291		except OSError:
292			pass
293		try:
294			os.rename(path, tmp)
295		except OSError:
296			sys.stderr.write('Could not rename %r to %r' % (path, tmp))
297		else:
298			try:
299				shutil.rmtree(tmp)
300			except OSError:
301				sys.stderr.write('Could not remove %r' % tmp)
302	sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
303
304
305def lru_evict():
306	"""
307	Reduce the cache size
308	"""
309	lockfile = os.path.join(CACHE_DIR, 'all.lock')
310	try:
311		st = os.stat(lockfile)
312	except EnvironmentError as e:
313		if e.errno == errno.ENOENT:
314			with open(lockfile, 'w') as f:
315				f.write(''.encode())
316			return
317		else:
318			raise
319
320	if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
321		# check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
322		# OCLOEXEC is unnecessary because no processes are spawned
323		fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
324		try:
325			try:
326				fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
327			except EnvironmentError:
328				sys.stderr.write('another process is running!\n')
329				pass
330			else:
331				# now dow the actual cleanup
332				lru_trim()
333				os.utime(lockfile, None)
334		finally:
335			os.close(fd)
336
337def copy_to_cache(sig, files_from, files_to):
338	"""
339	Copy files to the cache, existing files are overwritten,
340	and the copy is atomic only for a given file, not for all files
341	that belong to a given task object
342	"""
343	try:
344		for i, x in enumerate(files_from):
345			dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
346			atomic_copy(x, dest)
347	except EnvironmentError:
348		# no errors should be raised
349		pass
350	else:
351		# attempt trimming if caching was successful:
352		# we may have things to trim!
353		lru_evict()
354
355def copy_from_cache(sig, files_from, files_to):
356	"""
357	Copy files from the cache
358	"""
359	try:
360		for i, x in enumerate(files_to):
361			orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
362			atomic_copy(orig, x)
363
364		# success! update the cache time
365		os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
366	except EnvironmentError as e:
367		return "Failed to copy %r to %r: %s" % (orig, x, e)
368	return OK
369
370def loop():
371	"""
372	This function is run when this file is run as a standalone python script,
373	it assumes a parent process that will communicate the commands to it
374	as pickled-encoded tuples (one line per command)
375
376	The commands are to copy files to the cache or copy files from the
377	cache to a target destination
378	"""
379	# one operation is performed at a single time by a single process
380	# therefore stdin never has more than one line
381	txt = sys.stdin.readline().strip()
382	if not txt:
383		# parent process probably ended
384		sys.exit(1)
385	ret = OK
386
387	[sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
388	if files_from:
389		# pushing to cache is done without any wait
390		th = threading.Thread(target=copy_to_cache, args=(sig, files_from, files_to))
391		th.setDaemon(True)
392		th.start()
393	elif files_to:
394		# the build process waits for workers to (possibly) obtain files from the cache
395		ret = copy_from_cache(sig, files_from, files_to)
396	else:
397		ret = "Invalid command"
398
399	obj = base64.b64encode(cPickle.dumps(ret))
400	sys.stdout.write(obj.decode())
401	sys.stdout.write('\n')
402	sys.stdout.flush()
403
404if __name__ == '__main__':
405	while 1:
406		try:
407			loop()
408		except KeyboardInterrupt:
409			break
410
411