1#! /usr/bin/env python 2# encoding: utf-8 3# Thomas Nagy, 2019 (ita) 4 5""" 6Filesystem-based cache system to share and re-use build artifacts 7 8Cache access operations (copy to and from) are delegated to 9independent pre-forked worker subprocesses. 10 11The following environment variables may be set: 12* WAFCACHE: several possibilities: 13 - File cache: 14 absolute path of the waf cache (~/.cache/wafcache_user, 15 where `user` represents the currently logged-in user) 16 - URL to a cache server, for example: 17 export WAFCACHE=http://localhost:8080/files/ 18 in that case, GET/POST requests are made to urls of the form 19 http://localhost:8080/files/000000000/0 (cache management is delegated to the server) 20 - GCS, S3 or MINIO bucket 21 gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD) 22 s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD) 23 minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD) 24* WAFCACHE_CMD: bucket upload/download command, for example: 25 WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}" 26 Note that the WAFCACHE bucket value is used for the source or destination 27 depending on the operation (upload or download). For example, with: 28 WAFCACHE="gs://mybucket/" 29 the following commands may be run: 30 gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1 31 gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile 32* WAFCACHE_NO_PUSH: if set, disables pushing to the cache 33* WAFCACHE_VERBOSITY: if set, displays more detailed cache operations 34 35File cache specific options: 36 Files are copied using hard links by default; if the cache is located 37 onto another partition, the system switches to file copies instead. 38* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) 39* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) 40* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try 41 and trim the cache (3 minutess) 42 43Usage:: 44 45 def build(bld): 46 bld.load('wafcache') 47 ... 48 49To troubleshoot:: 50 51 waf clean build --zones=wafcache 52""" 53 54import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex 55try: 56 import subprocess32 as subprocess 57except ImportError: 58 import subprocess 59 60base_cache = os.path.expanduser('~/.cache/') 61if not os.path.isdir(base_cache): 62 base_cache = '/tmp/' 63default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser()) 64 65CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir) 66WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD') 67TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000)) 68EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3)) 69EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10)) 70WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0 71WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0 72OK = "ok" 73 74re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})') 75 76try: 77 import cPickle 78except ImportError: 79 import pickle as cPickle 80 81if __name__ != '__main__': 82 from waflib import Task, Logs, Utils, Build 83 84def can_retrieve_cache(self): 85 """ 86 New method for waf Task classes 87 """ 88 if not self.outputs: 89 return False 90 91 self.cached = False 92 93 sig = self.signature() 94 ssig = Utils.to_hex(self.uid() + sig) 95 96 files_to = [node.abspath() for node in self.outputs] 97 err = cache_command(ssig, [], files_to) 98 if err.startswith(OK): 99 if WAFCACHE_VERBOSITY: 100 Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) 101 else: 102 Logs.debug('wafcache: fetched %r from cache', files_to) 103 else: 104 if WAFCACHE_VERBOSITY: 105 Logs.pprint('YELLOW', ' No cache entry %s' % files_to) 106 else: 107 Logs.debug('wafcache: No cache entry %s: %s', files_to, err) 108 return False 109 110 self.cached = True 111 return True 112 113def put_files_cache(self): 114 """ 115 New method for waf Task classes 116 """ 117 if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs: 118 return 119 120 bld = self.generator.bld 121 sig = self.signature() 122 ssig = Utils.to_hex(self.uid() + sig) 123 124 files_from = [node.abspath() for node in self.outputs] 125 err = cache_command(ssig, files_from, []) 126 127 if err.startswith(OK): 128 if WAFCACHE_VERBOSITY: 129 Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) 130 else: 131 Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) 132 else: 133 if WAFCACHE_VERBOSITY: 134 Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) 135 else: 136 Logs.debug('wafcache: Error caching results %s: %s', files_from, err) 137 138 bld.task_sigs[self.uid()] = self.cache_sig 139 140def hash_env_vars(self, env, vars_lst): 141 """ 142 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths 143 """ 144 if not env.table: 145 env = env.parent 146 if not env: 147 return Utils.SIG_NIL 148 149 idx = str(id(env)) + str(vars_lst) 150 try: 151 cache = self.cache_env 152 except AttributeError: 153 cache = self.cache_env = {} 154 else: 155 try: 156 return self.cache_env[idx] 157 except KeyError: 158 pass 159 160 v = str([env[a] for a in vars_lst]) 161 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') 162 m = Utils.md5() 163 m.update(v.encode()) 164 ret = m.digest() 165 166 Logs.debug('envhash: %r %r', ret, v) 167 168 cache[idx] = ret 169 170 return ret 171 172def uid(self): 173 """ 174 Reimplement Task.uid() so that the signature does not depend on local paths 175 """ 176 try: 177 return self.uid_ 178 except AttributeError: 179 m = Utils.md5() 180 src = self.generator.bld.srcnode 181 up = m.update 182 up(self.__class__.__name__.encode()) 183 for x in self.inputs + self.outputs: 184 up(x.path_from(src).encode()) 185 self.uid_ = m.digest() 186 return self.uid_ 187 188 189def make_cached(cls): 190 """ 191 Enable the waf cache for a given task class 192 """ 193 if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False): 194 return 195 196 m1 = getattr(cls, 'run', None) 197 def run(self): 198 if getattr(self, 'nocache', False): 199 return m1(self) 200 if self.can_retrieve_cache(): 201 return 0 202 return m1(self) 203 cls.run = run 204 205 m2 = getattr(cls, 'post_run', None) 206 def post_run(self): 207 if getattr(self, 'nocache', False): 208 return m2(self) 209 ret = m2(self) 210 self.put_files_cache() 211 if hasattr(self, 'chmod'): 212 for node in self.outputs: 213 os.chmod(node.abspath(), self.chmod) 214 return ret 215 cls.post_run = post_run 216 cls.has_cache = True 217 218process_pool = [] 219def get_process(): 220 """ 221 Returns a worker process that can process waf cache commands 222 The worker process is assumed to be returned to the process pool when unused 223 """ 224 try: 225 return process_pool.pop() 226 except IndexError: 227 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py' 228 cmd = [sys.executable, '-c', Utils.readf(filepath)] 229 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0) 230 231def atexit_pool(): 232 for k in process_pool: 233 try: 234 os.kill(k.pid, 9) 235 except OSError: 236 pass 237 else: 238 k.wait() 239atexit.register(atexit_pool) 240 241def build(bld): 242 """ 243 Called during the build process to enable file caching 244 """ 245 if process_pool: 246 # already called once 247 return 248 249 # pre-allocation 250 processes = [get_process() for x in range(bld.jobs)] 251 process_pool.extend(processes) 252 253 Task.Task.can_retrieve_cache = can_retrieve_cache 254 Task.Task.put_files_cache = put_files_cache 255 Task.Task.uid = uid 256 Build.BuildContext.hash_env_vars = hash_env_vars 257 for x in reversed(list(Task.classes.values())): 258 make_cached(x) 259 260def cache_command(sig, files_from, files_to): 261 """ 262 Create a command for cache worker processes, returns a pickled 263 base64-encoded tuple containing the task signature, a list of files to 264 cache and a list of files files to get from cache (one of the lists 265 is assumed to be empty) 266 """ 267 proc = get_process() 268 269 obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) 270 proc.stdin.write(obj) 271 proc.stdin.write('\n'.encode()) 272 proc.stdin.flush() 273 obj = proc.stdout.readline() 274 if not obj: 275 raise OSError('Preforked sub-process %r died' % proc.pid) 276 process_pool.append(proc) 277 return cPickle.loads(base64.b64decode(obj)) 278 279try: 280 copyfun = os.link 281except NameError: 282 copyfun = shutil.copy2 283 284def atomic_copy(orig, dest): 285 """ 286 Copy files to the cache, the operation is atomic for a given file 287 """ 288 global copyfun 289 tmp = dest + '.tmp' 290 up = os.path.dirname(dest) 291 try: 292 os.makedirs(up) 293 except OSError: 294 pass 295 296 try: 297 copyfun(orig, tmp) 298 except OSError as e: 299 if e.errno == errno.EXDEV: 300 copyfun = shutil.copy2 301 copyfun(orig, tmp) 302 else: 303 raise 304 os.rename(tmp, dest) 305 306def lru_trim(): 307 """ 308 the cache folders take the form: 309 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9` 310 they are listed in order of last access, and then removed 311 until the amount of folders is within TRIM_MAX_FOLDERS and the total space 312 taken by files is less than EVICT_MAX_BYTES 313 """ 314 lst = [] 315 for up in os.listdir(CACHE_DIR): 316 if len(up) == 2: 317 sub = os.path.join(CACHE_DIR, up) 318 for hval in os.listdir(sub): 319 path = os.path.join(sub, hval) 320 321 size = 0 322 for fname in os.listdir(path): 323 size += os.lstat(os.path.join(path, fname)).st_size 324 lst.append((os.stat(path).st_mtime, size, path)) 325 326 lst.sort(key=lambda x: x[0]) 327 lst.reverse() 328 329 tot = sum(x[1] for x in lst) 330 while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS: 331 _, tmp_size, path = lst.pop() 332 tot -= tmp_size 333 334 tmp = path + '.tmp' 335 try: 336 shutil.rmtree(tmp) 337 except OSError: 338 pass 339 try: 340 os.rename(path, tmp) 341 except OSError: 342 sys.stderr.write('Could not rename %r to %r' % (path, tmp)) 343 else: 344 try: 345 shutil.rmtree(tmp) 346 except OSError: 347 sys.stderr.write('Could not remove %r' % tmp) 348 sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst))) 349 350 351def lru_evict(): 352 """ 353 Reduce the cache size 354 """ 355 lockfile = os.path.join(CACHE_DIR, 'all.lock') 356 try: 357 st = os.stat(lockfile) 358 except EnvironmentError as e: 359 if e.errno == errno.ENOENT: 360 with open(lockfile, 'w') as f: 361 f.write('') 362 return 363 else: 364 raise 365 366 if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60: 367 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big 368 # OCLOEXEC is unnecessary because no processes are spawned 369 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755) 370 try: 371 try: 372 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 373 except EnvironmentError: 374 sys.stderr.write('another process is running!\n') 375 pass 376 else: 377 # now dow the actual cleanup 378 lru_trim() 379 os.utime(lockfile, None) 380 finally: 381 os.close(fd) 382 383class netcache(object): 384 def __init__(self): 385 self.http = urllib3.PoolManager() 386 387 def url_of(self, sig, i): 388 return "%s/%s/%s" % (CACHE_DIR, sig, i) 389 390 def upload(self, file_path, sig, i): 391 url = self.url_of(sig, i) 392 with open(file_path, 'rb') as f: 393 file_data = f.read() 394 r = self.http.request('POST', url, timeout=60, 395 fields={ 'file': ('%s/%s' % (sig, i), file_data), }) 396 if r.status >= 400: 397 raise OSError("Invalid status %r %r" % (url, r.status)) 398 399 def download(self, file_path, sig, i): 400 url = self.url_of(sig, i) 401 with self.http.request('GET', url, preload_content=False, timeout=60) as inf: 402 if inf.status >= 400: 403 raise OSError("Invalid status %r %r" % (url, inf.status)) 404 with open(file_path, 'wb') as out: 405 shutil.copyfileobj(inf, out) 406 407 def copy_to_cache(self, sig, files_from, files_to): 408 try: 409 for i, x in enumerate(files_from): 410 if not os.path.islink(x): 411 self.upload(x, sig, i) 412 except Exception: 413 return traceback.format_exc() 414 return OK 415 416 def copy_from_cache(self, sig, files_from, files_to): 417 try: 418 for i, x in enumerate(files_to): 419 self.download(x, sig, i) 420 except Exception: 421 return traceback.format_exc() 422 return OK 423 424class fcache(object): 425 def __init__(self): 426 if not os.path.exists(CACHE_DIR): 427 os.makedirs(CACHE_DIR) 428 if not os.path.exists(CACHE_DIR): 429 raise ValueError('Could not initialize the cache directory') 430 431 def copy_to_cache(self, sig, files_from, files_to): 432 """ 433 Copy files to the cache, existing files are overwritten, 434 and the copy is atomic only for a given file, not for all files 435 that belong to a given task object 436 """ 437 try: 438 for i, x in enumerate(files_from): 439 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 440 atomic_copy(x, dest) 441 except Exception: 442 return traceback.format_exc() 443 else: 444 # attempt trimming if caching was successful: 445 # we may have things to trim! 446 lru_evict() 447 return OK 448 449 def copy_from_cache(self, sig, files_from, files_to): 450 """ 451 Copy files from the cache 452 """ 453 try: 454 for i, x in enumerate(files_to): 455 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 456 atomic_copy(orig, x) 457 458 # success! update the cache time 459 os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None) 460 except Exception: 461 return traceback.format_exc() 462 return OK 463 464class bucket_cache(object): 465 def bucket_copy(self, source, target): 466 if WAFCACHE_CMD: 467 def replacer(match): 468 if match.group('src'): 469 return source 470 elif match.group('tgt'): 471 return target 472 cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)] 473 elif CACHE_DIR.startswith('s3://'): 474 cmd = ['aws', 's3', 'cp', source, target] 475 elif CACHE_DIR.startswith('gs://'): 476 cmd = ['gsutil', 'cp', source, target] 477 else: 478 cmd = ['mc', 'cp', source, target] 479 480 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 481 out, err = proc.communicate() 482 if proc.returncode: 483 raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % ( 484 source, target, cmd, proc.returncode, out.decode(), err.decode())) 485 486 def copy_to_cache(self, sig, files_from, files_to): 487 try: 488 for i, x in enumerate(files_from): 489 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 490 self.bucket_copy(x, dest) 491 except Exception: 492 return traceback.format_exc() 493 return OK 494 495 def copy_from_cache(self, sig, files_from, files_to): 496 try: 497 for i, x in enumerate(files_to): 498 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) 499 self.bucket_copy(orig, x) 500 except EnvironmentError: 501 return traceback.format_exc() 502 return OK 503 504def loop(service): 505 """ 506 This function is run when this file is run as a standalone python script, 507 it assumes a parent process that will communicate the commands to it 508 as pickled-encoded tuples (one line per command) 509 510 The commands are to copy files to the cache or copy files from the 511 cache to a target destination 512 """ 513 # one operation is performed at a single time by a single process 514 # therefore stdin never has more than one line 515 txt = sys.stdin.readline().strip() 516 if not txt: 517 # parent process probably ended 518 sys.exit(1) 519 ret = OK 520 521 [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt)) 522 if files_from: 523 # TODO return early when pushing files upstream 524 ret = service.copy_to_cache(sig, files_from, files_to) 525 elif files_to: 526 # the build process waits for workers to (possibly) obtain files from the cache 527 ret = service.copy_from_cache(sig, files_from, files_to) 528 else: 529 ret = "Invalid command" 530 531 obj = base64.b64encode(cPickle.dumps(ret)) 532 sys.stdout.write(obj.decode()) 533 sys.stdout.write('\n') 534 sys.stdout.flush() 535 536if __name__ == '__main__': 537 if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'): 538 if CACHE_DIR.startswith('minio://'): 539 CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases 540 service = bucket_cache() 541 elif CACHE_DIR.startswith('http'): 542 service = netcache() 543 else: 544 service = fcache() 545 while 1: 546 try: 547 loop(service) 548 except KeyboardInterrupt: 549 break 550 551