1#! /usr/bin/env python 2# encoding: utf-8 3# Thomas Nagy, 2019 (ita) 4 5""" 6Filesystem-based cache system to share and re-use build artifacts 7 8The following environment variables may be set: 9* WAFCACHE: absolute path of the waf cache (/tmp/wafcache_user, 10 where `user` represents the currently logged-in user) 11* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) 12* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) 13* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try 14 and trim the cache (3 minutess) 15 16Cache access operations (copy to and from) are delegated to pre-forked 17subprocesses. Though these processes perform atomic copies, they 18are unaware of other processes running on the system 19 20The files are copied using hard links by default; if the cache is located 21onto another partition, the system switches to file copies instead. 22 23Usage:: 24 25 def build(bld): 26 bld.load('wafcache') 27 ... 28""" 29 30import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time 31try: 32 import subprocess32 as subprocess 33except ImportError: 34 import subprocess 35 36CACHE_DIR = os.environ.get('WAFCACHE', '/tmp/wafcache_' + getpass.getuser()) 37TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000)) 38EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3)) 39EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10)) 40OK = "ok" 41 42try: 43 import cPickle 44except ImportError: 45 import pickle as cPickle 46 47if __name__ != '__main__': 48 from waflib import Task, Logs, Utils, Build 49 50def can_retrieve_cache(self): 51 """ 52 New method for waf Task classes 53 """ 54 if not self.outputs: 55 return False 56 57 self.cached = False 58 59 sig = self.signature() 60 ssig = Utils.to_hex(self.uid() + sig) 61 62 files_to = [node.abspath() for node in self.outputs] 63 err = cache_command(ssig, [], files_to) 64 if not err.startswith(OK): 65 if Logs.verbose: 66 Logs.debug('wafcache: error getting from cache %s', err) 67 return False 68 69 self.cached = True 70 return True 71 72def put_files_cache(self): 73 """ 74 New method for waf Task classes 75 """ 76 if not self.outputs: 77 return 78 79 if getattr(self, 'cached', None): 80 return 81 82 bld = self.generator.bld 83 sig = self.signature() 84 ssig = Utils.to_hex(self.uid() + sig) 85 86 files_from = [node.abspath() for node in self.outputs] 87 err = cache_command(ssig, files_from, []) 88 89 if not err.startswith(OK): 90 if Logs.verbose: 91 Logs.debug('wafcache: error caching %s', err) 92 93 bld.task_sigs[self.uid()] = self.cache_sig 94 95def hash_env_vars(self, env, vars_lst): 96 """ 97 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths 98 """ 99 if not env.table: 100 env = env.parent 101 if not env: 102 return Utils.SIG_NIL 103 104 idx = str(id(env)) + str(vars_lst) 105 try: 106 cache = self.cache_env 107 except AttributeError: 108 cache = self.cache_env = {} 109 else: 110 try: 111 return self.cache_env[idx] 112 except KeyError: 113 pass 114 115 v = str([env[a] for a in vars_lst]) 116 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') 117 m = Utils.md5() 118 m.update(v.encode()) 119 ret = m.digest() 120 121 Logs.debug('envhash: %r %r', ret, v) 122 123 cache[idx] = ret 124 125 return ret 126 127def uid(self): 128 """ 129 Reimplement Task.uid() so that the signature does not depend on local paths 130 """ 131 try: 132 return self.uid_ 133 except AttributeError: 134 m = Utils.md5() 135 src = self.generator.bld.srcnode 136 up = m.update 137 up(self.__class__.__name__.encode()) 138 for x in self.inputs + self.outputs: 139 up(x.path_from(src).encode()) 140 self.uid_ = m.digest() 141 return self.uid_ 142 143 144def make_cached(cls): 145 """ 146 Enable the waf cache for a given task class 147 """ 148 if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False): 149 return 150 151 m1 = getattr(cls, 'run', None) 152 def run(self): 153 if getattr(self, 'nocache', False): 154 return m1(self) 155 if self.can_retrieve_cache(): 156 return 0 157 return m1(self) 158 cls.run = run 159 160 m2 = getattr(cls, 'post_run', None) 161 def post_run(self): 162 if getattr(self, 'nocache', False): 163 return m2(self) 164 ret = m2(self) 165 self.put_files_cache() 166 if hasattr(self, 'chmod'): 167 for node in self.outputs: 168 os.chmod(node.abspath(), self.chmod) 169 return ret 170 cls.post_run = post_run 171 cls.has_cache = True 172 173process_pool = [] 174def get_process(): 175 """ 176 Returns a worker process that can process waf cache commands 177 The worker process is assumed to be returned to the process pool when unused 178 """ 179 try: 180 return process_pool.pop() 181 except IndexError: 182 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py' 183 cmd = [sys.executable, '-c', Utils.readf(filepath)] 184 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0) 185 186def atexit_pool(): 187 for k in process_pool: 188 try: 189 os.kill(k.pid, 9) 190 except OSError: 191 pass 192 else: 193 k.wait() 194atexit.register(atexit_pool) 195 196def build(bld): 197 """ 198 Called during the build process to enable file caching 199 """ 200 if process_pool: 201 # already called once 202 return 203 204 for x in range(bld.jobs): 205 process_pool.append(get_process()) 206 207 Task.Task.can_retrieve_cache = can_retrieve_cache 208 Task.Task.put_files_cache = put_files_cache 209 Task.Task.uid = uid 210 Build.BuildContext.hash_env_vars = hash_env_vars 211 for x in Task.classes.values(): 212 make_cached(x) 213 214def cache_command(sig, files_from, files_to): 215 """ 216 Create a command for cache worker processes, returns a pickled 217 base64-encoded tuple containing the task signature, a list of files to 218 cache and a list of files files to get from cache (one of the lists 219 is assumed to be empty) 220 """ 221 proc = get_process() 222 223 obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) 224 proc.stdin.write(obj) 225 proc.stdin.write('\n'.encode()) 226 proc.stdin.flush() 227 obj = proc.stdout.readline() 228 if not obj: 229 raise OSError('Preforked sub-process %r died' % proc.pid) 230 process_pool.append(proc) 231 return cPickle.loads(base64.b64decode(obj)) 232 233try: 234 copyfun = os.link 235except NameError: 236 copyfun = shutil.copy2 237 238def atomic_copy(orig, dest): 239 """ 240 Copy files to the cache, the operation is atomic for a given file 241 """ 242 global copyfun 243 tmp = dest + '.tmp' 244 up = os.path.dirname(dest) 245 try: 246 os.makedirs(up) 247 except OSError: 248 pass 249 250 try: 251 copyfun(orig, tmp) 252 except OSError as e: 253 if e.errno == errno.EXDEV: 254 copyfun = shutil.copy2 255 copyfun(orig, tmp) 256 else: 257 raise 258 os.rename(tmp, dest) 259 260def lru_trim(): 261 """ 262 the cache folders take the form: 263 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9` 264 they are listed in order of last access, and then removed 265 until the amount of folders is within TRIM_MAX_FOLDERS and the total space 266 taken by files is less than EVICT_MAX_BYTES 267 """ 268 lst = [] 269 for up in os.listdir(CACHE_DIR): 270 if len(up) == 2: 271 sub = os.path.join(CACHE_DIR, up) 272 for hval in os.listdir(sub): 273 path = os.path.join(sub, hval) 274 275 size = 0 276 for fname in os.listdir(path): 277 size += os.lstat(os.path.join(path, fname)).st_size 278 lst.append((os.stat(path).st_mtime, size, path)) 279 280 lst.sort(key=lambda x: x[0]) 281 lst.reverse() 282 283 tot = sum(x[1] for x in lst) 284 while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS: 285 _, tmp_size, path = lst.pop() 286 tot -= tmp_size 287 288 tmp = path + '.tmp' 289 try: 290 shutil.rmtree(tmp) 291 except OSError: 292 pass 293 try: 294 os.rename(path, tmp) 295 except OSError: 296 sys.stderr.write('Could not rename %r to %r' % (path, tmp)) 297 else: 298 try: 299 shutil.rmtree(tmp) 300 except OSError: 301 sys.stderr.write('Could not remove %r' % tmp) 302 sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst))) 303 304 305def lru_evict(): 306 """ 307 Reduce the cache size 308 """ 309 lockfile = os.path.join(CACHE_DIR, 'all.lock') 310 try: 311 st = os.stat(lockfile) 312 except EnvironmentError as e: 313 if e.errno == errno.ENOENT: 314 with open(lockfile, 'w') as f: 315 f.write(''.encode()) 316 return 317 else: 318 raise 319 320 if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60: 321 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big 322 # OCLOEXEC is unnecessary because no processes are spawned 323 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755) 324 try: 325 try: 326 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 327 except EnvironmentError: 328 sys.stderr.write('another process is running!\n') 329 pass 330 else: 331 # now dow the actual cleanup 332 lru_trim() 333 os.utime(lockfile, None) 334 finally: 335 os.close(fd) 336 337def copy_to_cache(sig, files_from, files_to): 338 """ 339 Copy files to the cache, existing files are overwritten, 340 and the copy is atomic only for a given file, not for all files 341 that belong to a given task object 342 """ 343 try: 344 for i, x in enumerate(files_from): 345 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 346 atomic_copy(x, dest) 347 except EnvironmentError: 348 # no errors should be raised 349 pass 350 else: 351 # attempt trimming if caching was successful: 352 # we may have things to trim! 353 lru_evict() 354 355def copy_from_cache(sig, files_from, files_to): 356 """ 357 Copy files from the cache 358 """ 359 try: 360 for i, x in enumerate(files_to): 361 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 362 atomic_copy(orig, x) 363 364 # success! update the cache time 365 os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None) 366 except EnvironmentError as e: 367 return "Failed to copy %r to %r: %s" % (orig, x, e) 368 return OK 369 370def loop(): 371 """ 372 This function is run when this file is run as a standalone python script, 373 it assumes a parent process that will communicate the commands to it 374 as pickled-encoded tuples (one line per command) 375 376 The commands are to copy files to the cache or copy files from the 377 cache to a target destination 378 """ 379 # one operation is performed at a single time by a single process 380 # therefore stdin never has more than one line 381 txt = sys.stdin.readline().strip() 382 if not txt: 383 # parent process probably ended 384 sys.exit(1) 385 ret = OK 386 387 [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt)) 388 if files_from: 389 # pushing to cache is done without any wait 390 th = threading.Thread(target=copy_to_cache, args=(sig, files_from, files_to)) 391 th.setDaemon(True) 392 th.start() 393 elif files_to: 394 # the build process waits for workers to (possibly) obtain files from the cache 395 ret = copy_from_cache(sig, files_from, files_to) 396 else: 397 ret = "Invalid command" 398 399 obj = base64.b64encode(cPickle.dumps(ret)) 400 sys.stdout.write(obj.decode()) 401 sys.stdout.write('\n') 402 sys.stdout.flush() 403 404if __name__ == '__main__': 405 while 1: 406 try: 407 loop() 408 except KeyboardInterrupt: 409 break 410 411