1#! /usr/bin/env python 2# encoding: utf-8 3# Thomas Nagy, 2011-2015 (ita) 4 5""" 6A client for the network cache (playground/netcache/). Launch the server with: 7./netcache_server, then use it for the builds by adding the following: 8 9 def build(bld): 10 bld.load('netcache_client') 11 12The parameters should be present in the environment in the form: 13 NETCACHE=host:port waf configure build 14 15Or in a more detailed way: 16 NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build 17 18where: 19 host: host where the server resides, by default localhost 20 port: by default push on 11001 and pull on 12001 21 22Use the server provided in playground/netcache/Netcache.java 23""" 24 25import os, socket, time, atexit, sys 26from waflib import Task, Logs, Utils, Build, Runner 27from waflib.Configure import conf 28 29BUF = 8192 * 16 30HEADER_SIZE = 128 31MODES = ['PUSH', 'PULL', 'PUSH_PULL'] 32STALE_TIME = 30 # seconds 33 34GET = 'GET' 35PUT = 'PUT' 36LST = 'LST' 37BYE = 'BYE' 38 39all_sigs_in_cache = (0.0, []) 40 41def put_data(conn, data): 42 if sys.hexversion > 0x3000000: 43 data = data.encode('latin-1') 44 cnt = 0 45 while cnt < len(data): 46 sent = conn.send(data[cnt:]) 47 if sent == 0: 48 raise RuntimeError('connection ended') 49 cnt += sent 50 51push_connections = Runner.Queue(0) 52pull_connections = Runner.Queue(0) 53def get_connection(push=False): 54 # return a new connection... do not forget to release it! 55 try: 56 if push: 57 ret = push_connections.get(block=False) 58 else: 59 ret = pull_connections.get(block=False) 60 except Exception: 61 ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 62 if push: 63 ret.connect(Task.push_addr) 64 else: 65 ret.connect(Task.pull_addr) 66 return ret 67 68def release_connection(conn, msg='', push=False): 69 if conn: 70 if push: 71 push_connections.put(conn) 72 else: 73 pull_connections.put(conn) 74 75def close_connection(conn, msg=''): 76 if conn: 77 data = '%s,%s' % (BYE, msg) 78 try: 79 put_data(conn, data.ljust(HEADER_SIZE)) 80 except: 81 pass 82 try: 83 conn.close() 84 except: 85 pass 86 87def close_all(): 88 for q in (push_connections, pull_connections): 89 while q.qsize(): 90 conn = q.get() 91 try: 92 close_connection(conn) 93 except: 94 # ignore errors when cleaning up 95 pass 96atexit.register(close_all) 97 98def read_header(conn): 99 cnt = 0 100 buf = [] 101 while cnt < HEADER_SIZE: 102 data = conn.recv(HEADER_SIZE - cnt) 103 if not data: 104 #import traceback 105 #traceback.print_stack() 106 raise ValueError('connection ended when reading a header %r' % buf) 107 buf.append(data) 108 cnt += len(data) 109 if sys.hexversion > 0x3000000: 110 ret = ''.encode('latin-1').join(buf) 111 ret = ret.decode('latin-1') 112 else: 113 ret = ''.join(buf) 114 return ret 115 116def check_cache(conn, ssig): 117 """ 118 List the files on the server, this is an optimization because it assumes that 119 concurrent builds are rare 120 """ 121 global all_sigs_in_cache 122 if not STALE_TIME: 123 return 124 if time.time() - all_sigs_in_cache[0] > STALE_TIME: 125 126 params = (LST,'') 127 put_data(conn, ','.join(params).ljust(HEADER_SIZE)) 128 129 # read what is coming back 130 ret = read_header(conn) 131 size = int(ret.split(',')[0]) 132 133 buf = [] 134 cnt = 0 135 while cnt < size: 136 data = conn.recv(min(BUF, size-cnt)) 137 if not data: 138 raise ValueError('connection ended %r %r' % (cnt, size)) 139 buf.append(data) 140 cnt += len(data) 141 142 if sys.hexversion > 0x3000000: 143 ret = ''.encode('latin-1').join(buf) 144 ret = ret.decode('latin-1') 145 else: 146 ret = ''.join(buf) 147 148 all_sigs_in_cache = (time.time(), ret.splitlines()) 149 Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1])) 150 151 if not ssig in all_sigs_in_cache[1]: 152 raise ValueError('no file %s in cache' % ssig) 153 154class MissingFile(Exception): 155 pass 156 157def recv_file(conn, ssig, count, p): 158 check_cache(conn, ssig) 159 160 params = (GET, ssig, str(count)) 161 put_data(conn, ','.join(params).ljust(HEADER_SIZE)) 162 data = read_header(conn) 163 164 size = int(data.split(',')[0]) 165 166 if size == -1: 167 raise MissingFile('no file %s - %s in cache' % (ssig, count)) 168 169 # get the file, writing immediately 170 # TODO a tmp file would be better 171 f = open(p, 'wb') 172 cnt = 0 173 while cnt < size: 174 data = conn.recv(min(BUF, size-cnt)) 175 if not data: 176 raise ValueError('connection ended %r %r' % (cnt, size)) 177 f.write(data) 178 cnt += len(data) 179 f.close() 180 181def sock_send(conn, ssig, cnt, p): 182 #print "pushing %r %r %r" % (ssig, cnt, p) 183 size = os.stat(p).st_size 184 params = (PUT, ssig, str(cnt), str(size)) 185 put_data(conn, ','.join(params).ljust(HEADER_SIZE)) 186 f = open(p, 'rb') 187 cnt = 0 188 while cnt < size: 189 r = f.read(min(BUF, size-cnt)) 190 while r: 191 k = conn.send(r) 192 if not k: 193 raise ValueError('connection ended') 194 cnt += k 195 r = r[k:] 196 197def can_retrieve_cache(self): 198 if not Task.pull_addr: 199 return False 200 if not self.outputs: 201 return False 202 self.cached = False 203 204 cnt = 0 205 sig = self.signature() 206 ssig = Utils.to_hex(self.uid() + sig) 207 208 conn = None 209 err = False 210 try: 211 try: 212 conn = get_connection() 213 for node in self.outputs: 214 p = node.abspath() 215 recv_file(conn, ssig, cnt, p) 216 cnt += 1 217 except MissingFile as e: 218 Logs.debug('netcache: file is not in the cache %r', e) 219 err = True 220 except Exception as e: 221 Logs.debug('netcache: could not get the files %r', self.outputs) 222 if Logs.verbose > 1: 223 Logs.debug('netcache: exception %r', e) 224 err = True 225 226 # broken connection? remove this one 227 close_connection(conn) 228 conn = None 229 else: 230 Logs.debug('netcache: obtained %r from cache', self.outputs) 231 232 finally: 233 release_connection(conn) 234 if err: 235 return False 236 237 self.cached = True 238 return True 239 240@Utils.run_once 241def put_files_cache(self): 242 if not Task.push_addr: 243 return 244 if not self.outputs: 245 return 246 if getattr(self, 'cached', None): 247 return 248 249 #print "called put_files_cache", id(self) 250 bld = self.generator.bld 251 sig = self.signature() 252 ssig = Utils.to_hex(self.uid() + sig) 253 254 conn = None 255 cnt = 0 256 try: 257 for node in self.outputs: 258 # We could re-create the signature of the task with the signature of the outputs 259 # in practice, this means hashing the output files 260 # this is unnecessary 261 try: 262 if not conn: 263 conn = get_connection(push=True) 264 sock_send(conn, ssig, cnt, node.abspath()) 265 Logs.debug('netcache: sent %r', node) 266 except Exception as e: 267 Logs.debug('netcache: could not push the files %r', e) 268 269 # broken connection? remove this one 270 close_connection(conn) 271 conn = None 272 cnt += 1 273 finally: 274 release_connection(conn, push=True) 275 276 bld.task_sigs[self.uid()] = self.cache_sig 277 278def hash_env_vars(self, env, vars_lst): 279 # reimplement so that the resulting hash does not depend on local paths 280 if not env.table: 281 env = env.parent 282 if not env: 283 return Utils.SIG_NIL 284 285 idx = str(id(env)) + str(vars_lst) 286 try: 287 cache = self.cache_env 288 except AttributeError: 289 cache = self.cache_env = {} 290 else: 291 try: 292 return self.cache_env[idx] 293 except KeyError: 294 pass 295 296 v = str([env[a] for a in vars_lst]) 297 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') 298 m = Utils.md5() 299 m.update(v.encode()) 300 ret = m.digest() 301 302 Logs.debug('envhash: %r %r', ret, v) 303 304 cache[idx] = ret 305 306 return ret 307 308def uid(self): 309 # reimplement so that the signature does not depend on local paths 310 try: 311 return self.uid_ 312 except AttributeError: 313 m = Utils.md5() 314 src = self.generator.bld.srcnode 315 up = m.update 316 up(self.__class__.__name__.encode()) 317 for x in self.inputs + self.outputs: 318 up(x.path_from(src).encode()) 319 self.uid_ = m.digest() 320 return self.uid_ 321 322 323def make_cached(cls): 324 if getattr(cls, 'nocache', None): 325 return 326 327 m1 = cls.run 328 def run(self): 329 if getattr(self, 'nocache', False): 330 return m1(self) 331 if self.can_retrieve_cache(): 332 return 0 333 return m1(self) 334 cls.run = run 335 336 m2 = cls.post_run 337 def post_run(self): 338 if getattr(self, 'nocache', False): 339 return m2(self) 340 bld = self.generator.bld 341 ret = m2(self) 342 if bld.cache_global: 343 self.put_files_cache() 344 if hasattr(self, 'chmod'): 345 for node in self.outputs: 346 os.chmod(node.abspath(), self.chmod) 347 return ret 348 cls.post_run = post_run 349 350@conf 351def setup_netcache(ctx, push_addr, pull_addr): 352 Task.Task.can_retrieve_cache = can_retrieve_cache 353 Task.Task.put_files_cache = put_files_cache 354 Task.Task.uid = uid 355 Task.push_addr = push_addr 356 Task.pull_addr = pull_addr 357 Build.BuildContext.hash_env_vars = hash_env_vars 358 ctx.cache_global = True 359 360 for x in Task.classes.values(): 361 make_cached(x) 362 363def build(bld): 364 if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ: 365 Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001') 366 os.environ['NETCACHE_PULL'] = '127.0.0.1:12001' 367 os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001' 368 369 if 'NETCACHE' in os.environ: 370 if not 'NETCACHE_PUSH' in os.environ: 371 os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE'] 372 if not 'NETCACHE_PULL' in os.environ: 373 os.environ['NETCACHE_PULL'] = os.environ['NETCACHE'] 374 375 v = os.environ['NETCACHE_PULL'] 376 if v: 377 h, p = v.split(':') 378 pull_addr = (h, int(p)) 379 else: 380 pull_addr = None 381 382 v = os.environ['NETCACHE_PUSH'] 383 if v: 384 h, p = v.split(':') 385 push_addr = (h, int(p)) 386 else: 387 push_addr = None 388 389 setup_netcache(bld, push_addr, pull_addr) 390 391