1#!/usr/bin/env python 2# Copyright 2013 The LUCI Authors. All rights reserved. 3# Use of this source code is governed under the Apache License, Version 2.0 4# that can be found in the LICENSE file. 5 6"""Archives a set of files or directories to an Isolate Server.""" 7 8from __future__ import print_function 9 10import collections 11import errno 12import functools 13import logging 14import optparse 15import os 16import re 17import signal 18import stat 19import sys 20import tarfile 21import threading 22import time 23import zlib 24 25from utils import net 26from utils import tools 27tools.force_local_third_party() 28 29# third_party/ 30import colorama 31from depot_tools import fix_encoding 32from depot_tools import subcommand 33import six 34from six.moves import queue as Queue 35 36# pylint: disable=ungrouped-imports 37import auth 38import isolated_format 39import isolate_storage 40import local_caching 41from utils import file_path 42from utils import fs 43from utils import logging_utils 44from utils import net 45from utils import on_error 46from utils import subprocess42 47from utils import threading_utils 48 49 50__version__ = '0.9.0' 51 52# Version of isolate protocol passed to the server in /handshake request. 53ISOLATE_PROTOCOL_VERSION = '1.0' 54 55 56# Maximum expected delay (in seconds) between successive file fetches or uploads 57# in Storage. If it takes longer than that, a deadlock might be happening 58# and all stack frames for all threads are dumped to log. 59DEADLOCK_TIMEOUT = 5 * 60 60 61 62# The number of files to check the isolate server per /pre-upload query. 63# All files are sorted by likelihood of a change in the file content 64# (currently file size is used to estimate this: larger the file -> larger the 65# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files 66# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1], 67# and so on. Numbers here is a trade-off; the more per request, the lower the 68# effect of HTTP round trip latency and TCP-level chattiness. On the other hand, 69# larger values cause longer lookups, increasing the initial latency to start 70# uploading, which is especially an issue for large files. This value is 71# optimized for the "few thousands files to look up with minimal number of large 72# files missing" case. 73ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100) 74 75 76# A list of already compressed extension types that should not receive any 77# compression before being uploaded. 78ALREADY_COMPRESSED_TYPES = [ 79 '7z', 80 'avi', 81 'cur', 82 'gif', 83 'h264', 84 'jar', 85 'jpeg', 86 'jpg', 87 'mp4', 88 'pdf', 89 'png', 90 'wav', 91 'zip', 92] 93 94# The delay (in seconds) to wait between logging statements when retrieving the 95# required files. This is intended to let the user know that the program is 96# still running. 97DELAY_BETWEEN_UPDATES_IN_SECS = 30 98 99 100DEFAULT_DENYLIST = ( 101 # Temporary vim or python files. 102 r'^.+\.(?:pyc|swp)$', 103 # .git or .svn directory. 104 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$', 105) 106 107 108class Error(Exception): 109 """Generic runtime error.""" 110 pass 111 112 113class Aborted(Error): 114 """Operation aborted.""" 115 pass 116 117 118class AlreadyExists(Error): 119 """File already exists.""" 120 121 122def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0): 123 """Yields file content in chunks of |chunk_size| starting from |offset|.""" 124 with fs.open(path, 'rb') as f: 125 if offset: 126 f.seek(offset) 127 while True: 128 data = f.read(chunk_size) 129 if not data: 130 break 131 yield data 132 133 134def fileobj_path(fileobj): 135 """Return file system path for file like object or None. 136 137 The returned path is guaranteed to exist and can be passed to file system 138 operations like copy. 139 """ 140 name = getattr(fileobj, 'name', None) 141 if name is None: 142 return None 143 144 # If the file like object was created using something like open("test.txt") 145 # name will end up being a str (such as a function outside our control, like 146 # the standard library). We want all our paths to be unicode objects, so we 147 # decode it. 148 if not isinstance(name, six.text_type): 149 # We incorrectly assume that UTF-8 is used everywhere. 150 name = name.decode('utf-8') 151 152 # fs.exists requires an absolute path, otherwise it will fail with an 153 # assertion error. 154 if not os.path.isabs(name): 155 return None 156 157 if fs.exists(name): 158 return name 159 return None 160 161 162# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file 163# wrappers have been created. 164def fileobj_copy( 165 dstfileobj, srcfileobj, size=-1, 166 chunk_size=isolated_format.DISK_FILE_CHUNK): 167 """Copy data from srcfileobj to dstfileobj. 168 169 Providing size means exactly that amount of data will be copied (if there 170 isn't enough data, an IOError exception is thrown). Otherwise all data until 171 the EOF marker will be copied. 172 """ 173 if size == -1 and hasattr(srcfileobj, 'tell'): 174 if srcfileobj.tell() != 0: 175 raise IOError('partial file but not using size') 176 177 written = 0 178 while written != size: 179 readsize = chunk_size 180 if size > 0: 181 readsize = min(readsize, size-written) 182 data = srcfileobj.read(readsize) 183 if not data: 184 if size == -1: 185 break 186 raise IOError('partial file, got %s, wanted %s' % (written, size)) 187 dstfileobj.write(data) 188 written += len(data) 189 190 191def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False): 192 """Put srcfileobj at the given dstpath with given mode. 193 194 The function aims to do this as efficiently as possible while still allowing 195 any possible file like object be given. 196 197 Creating a tree of hardlinks has a few drawbacks: 198 - tmpfs cannot be used for the scratch space. The tree has to be on the same 199 partition as the cache. 200 - involves a write to the inode, which advances ctime, cause a metadata 201 writeback (causing disk seeking). 202 - cache ctime cannot be used to detect modifications / corruption. 203 - Some file systems (NTFS) have a 64k limit on the number of hardlink per 204 partition. This is why the function automatically fallbacks to copying the 205 file content. 206 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the 207 same owner is for all hardlinks. 208 - Anecdotal report that ext2 is known to be potentially faulty on high rate 209 of hardlink creation. 210 211 Creating a tree of symlinks has a few drawbacks: 212 - Tasks running the equivalent of os.path.realpath() will get the naked path 213 and may fail. 214 - Windows: 215 - Symlinks are reparse points: 216 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx 217 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx 218 - Symbolic links are Win32 paths, not NT paths. 219 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html 220 - Symbolic links are supported on Windows 7 and later only. 221 - SeCreateSymbolicLinkPrivilege is needed, which is not present by 222 default. 223 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted 224 RID is present in the token; 225 https://msdn.microsoft.com/en-us/library/bb530410.aspx 226 """ 227 srcpath = fileobj_path(srcfileobj) 228 if srcpath and size == -1: 229 readonly = file_mode is None or ( 230 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)) 231 232 if readonly: 233 # If the file is read only we can link the file 234 if use_symlink: 235 link_mode = file_path.SYMLINK_WITH_FALLBACK 236 else: 237 link_mode = file_path.HARDLINK_WITH_FALLBACK 238 else: 239 # If not read only, we must copy the file 240 link_mode = file_path.COPY 241 242 file_path.link_file(dstpath, srcpath, link_mode) 243 assert fs.exists(dstpath) 244 else: 245 # Need to write out the file 246 with fs.open(dstpath, 'wb') as dstfileobj: 247 fileobj_copy(dstfileobj, srcfileobj, size) 248 249 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE: 250 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns 251 # early to skip slow/unnecessary chmod call. 252 return 253 254 # file_mode of 0 is actually valid, so need explicit check. 255 if file_mode is not None: 256 fs.chmod(dstpath, file_mode) 257 258 259def zip_compress(content_generator, level=7): 260 """Reads chunks from |content_generator| and yields zip compressed chunks.""" 261 compressor = zlib.compressobj(level) 262 for chunk in content_generator: 263 compressed = compressor.compress(chunk) 264 if compressed: 265 yield compressed 266 tail = compressor.flush(zlib.Z_FINISH) 267 if tail: 268 yield tail 269 270 271def zip_decompress( 272 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK): 273 """Reads zipped data from |content_generator| and yields decompressed data. 274 275 Decompresses data in small chunks (no larger than |chunk_size|) so that 276 zip bomb file doesn't cause zlib to preallocate huge amount of memory. 277 278 Raises IOError if data is corrupted or incomplete. 279 """ 280 decompressor = zlib.decompressobj() 281 compressed_size = 0 282 try: 283 for chunk in content_generator: 284 compressed_size += len(chunk) 285 data = decompressor.decompress(chunk, chunk_size) 286 if data: 287 yield data 288 while decompressor.unconsumed_tail: 289 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size) 290 if data: 291 yield data 292 tail = decompressor.flush() 293 if tail: 294 yield tail 295 except zlib.error as e: 296 raise IOError( 297 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e)) 298 # Ensure all data was read and decompressed. 299 if decompressor.unused_data or decompressor.unconsumed_tail: 300 raise IOError('Not all data was decompressed') 301 302 303def _get_zip_compression_level(filename): 304 """Given a filename calculates the ideal zip compression level to use.""" 305 file_ext = os.path.splitext(filename)[1].lower() 306 # TODO(csharp): Profile to find what compression level works best. 307 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7 308 309 310def create_directories(base_directory, files): 311 """Creates the directory structure needed by the given list of files.""" 312 logging.debug('create_directories(%s, %d)', base_directory, len(files)) 313 # Creates the tree of directories to create. 314 directories = set(os.path.dirname(f) for f in files) 315 for item in list(directories): 316 while item: 317 directories.add(item) 318 item = os.path.dirname(item) 319 for d in sorted(directories): 320 if d: 321 abs_d = os.path.join(base_directory, d) 322 if not fs.isdir(abs_d): 323 fs.mkdir(abs_d) 324 325 326def _create_symlinks(base_directory, files): 327 """Creates any symlinks needed by the given set of files.""" 328 for filepath, properties in files: 329 if 'l' not in properties: 330 continue 331 if sys.platform == 'win32': 332 # TODO(maruel): Create symlink via the win32 api. 333 logging.warning('Ignoring symlink %s', filepath) 334 continue 335 outfile = os.path.join(base_directory, filepath) 336 try: 337 os.symlink(properties['l'], outfile) # pylint: disable=E1101 338 except OSError as e: 339 if e.errno == errno.EEXIST: 340 raise AlreadyExists('File %s already exists.' % outfile) 341 raise 342 343 344class _ThreadFile(object): 345 """Multithreaded fake file. Used by TarBundle.""" 346 def __init__(self): 347 self._data = threading_utils.TaskChannel() 348 self._offset = 0 349 350 def __iter__(self): 351 return self._data 352 353 def tell(self): 354 return self._offset 355 356 def write(self, b): 357 self._data.send_result(b) 358 self._offset += len(b) 359 360 def close(self): 361 self._data.send_done() 362 363 364class FileItem(isolate_storage.Item): 365 """A file to push to Storage. 366 367 Its digest and size may be provided in advance, if known. Otherwise they will 368 be derived from the file content. 369 """ 370 371 def __init__(self, path, algo, digest=None, size=None, high_priority=False): 372 super(FileItem, self).__init__( 373 digest, 374 size if size is not None else fs.stat(path).st_size, 375 high_priority, 376 compression_level=_get_zip_compression_level(path)) 377 self._path = path 378 self._algo = algo 379 self._meta = None 380 381 @property 382 def path(self): 383 return self._path 384 385 @property 386 def algo(self): 387 return self._algo 388 389 @property 390 def digest(self): 391 if not self._digest: 392 self._digest = isolated_format.hash_file(self._path, self._algo) 393 return self._digest 394 395 @property 396 def meta(self): 397 if not self._meta: 398 # TODO(maruel): Inline. 399 self._meta = isolated_format.file_to_metadata(self.path, False) 400 # We need to hash right away. 401 self._meta['h'] = self.digest 402 return self._meta 403 404 def content(self): 405 return file_read(self.path) 406 407 408class TarBundle(isolate_storage.Item): 409 """Tarfile to push to Storage. 410 411 Its digest is the digest of all the files it contains. It is generated on the 412 fly. 413 """ 414 415 def __init__(self, root, algo): 416 # 2 trailing 512 bytes headers. 417 super(TarBundle, self).__init__(size=1024) 418 self._items = [] 419 self._meta = None 420 self._algo = algo 421 self._root_len = len(root) + 1 422 # Same value as for Go. 423 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go 424 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go 425 self._archive_max_size = int(10e6) 426 427 @property 428 def digest(self): 429 if not self._digest: 430 self._prepare() 431 return self._digest 432 433 @property 434 def size(self): 435 if self._size is None: 436 self._prepare() 437 return self._size 438 439 def try_add(self, item): 440 """Try to add this file to the bundle. 441 442 It is extremely naive but this should be just enough for 443 https://crbug.com/825418. 444 445 Future improvements should be in the Go code, and the Swarming bot should be 446 migrated to use the Go code instead. 447 """ 448 if not item.size: 449 return False 450 # pylint: disable=unreachable 451 rounded = (item.size + 512) & ~511 452 if rounded + self._size > self._archive_max_size: 453 return False 454 # https://crbug.com/825418 455 return False 456 self._size += rounded 457 self._items.append(item) 458 return True 459 460 def yield_item_path_meta(self): 461 """Returns a tuple(Item, filepath, meta_dict). 462 463 If the bundle contains less than 5 items, the items are yielded. 464 """ 465 if len(self._items) < 5: 466 # The tarball is too small, yield individual items, if any. 467 for item in self._items: 468 yield item, item.path[self._root_len:], item.meta 469 else: 470 # This ensures self._meta is set. 471 p = self.digest + '.tar' 472 # Yield itself as a tarball. 473 yield self, p, self._meta 474 475 def content(self): 476 """Generates the tarfile content on the fly.""" 477 obj = _ThreadFile() 478 def _tar_thread(): 479 try: 480 t = tarfile.open( 481 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8') 482 for item in self._items: 483 logging.info(' tarring %s', item.path) 484 t.add(item.path) 485 t.close() 486 except Exception: 487 logging.exception('Internal failure') 488 finally: 489 obj.close() 490 491 t = threading.Thread(target=_tar_thread) 492 t.start() 493 try: 494 for data in obj: 495 yield data 496 finally: 497 t.join() 498 499 def _prepare(self): 500 h = self._algo() 501 total = 0 502 for chunk in self.content(): 503 h.update(chunk) 504 total += len(chunk) 505 # pylint: disable=attribute-defined-outside-init 506 # This is not true, they are defined in Item.__init__(). 507 self._digest = h.hexdigest() 508 self._size = total 509 self._meta = { 510 'h': self.digest, 511 's': self.size, 512 't': u'tar', 513 } 514 515 516class BufferItem(isolate_storage.Item): 517 """A byte buffer to push to Storage.""" 518 519 def __init__(self, buf, algo, high_priority=False): 520 super(BufferItem, self).__init__( 521 digest=algo(buf).hexdigest(), 522 size=len(buf), 523 high_priority=high_priority) 524 self._buffer = buf 525 526 def content(self): 527 return [self._buffer] 528 529 530class Storage(object): 531 """Efficiently downloads or uploads large set of files via StorageApi. 532 533 Implements compression support, parallel 'contains' checks, parallel uploads 534 and more. 535 536 Works only within single namespace (and thus hashing algorithm and compression 537 scheme are fixed). 538 539 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies 540 signal handlers table to handle Ctrl+C. 541 """ 542 543 def __init__(self, storage_api): 544 self._storage_api = storage_api 545 self._cpu_thread_pool = None 546 self._net_thread_pool = None 547 self._aborted = False 548 self._prev_sig_handlers = {} 549 550 @property 551 def server_ref(self): 552 """Shortcut to get the server_ref from storage_api. 553 554 This can be used to get the underlying hash_algo. 555 """ 556 return self._storage_api.server_ref 557 558 @property 559 def cpu_thread_pool(self): 560 """ThreadPool for CPU-bound tasks like zipping.""" 561 if self._cpu_thread_pool is None: 562 threads = max(threading_utils.num_processors(), 2) 563 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32 564 if sys.maxsize <= max_size: 565 # On 32 bits userland, do not try to use more than 16 threads. 566 threads = min(threads, 16) 567 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip') 568 return self._cpu_thread_pool 569 570 @property 571 def net_thread_pool(self): 572 """AutoRetryThreadPool for IO-bound tasks, retries IOError.""" 573 if self._net_thread_pool is None: 574 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool() 575 return self._net_thread_pool 576 577 def close(self): 578 """Waits for all pending tasks to finish.""" 579 logging.info('Waiting for all threads to die...') 580 if self._cpu_thread_pool: 581 self._cpu_thread_pool.join() 582 self._cpu_thread_pool.close() 583 self._cpu_thread_pool = None 584 if self._net_thread_pool: 585 self._net_thread_pool.join() 586 self._net_thread_pool.close() 587 self._net_thread_pool = None 588 logging.info('Done.') 589 590 def abort(self): 591 """Cancels any pending or future operations.""" 592 # This is not strictly theadsafe, but in the worst case the logging message 593 # will be printed twice. Not a big deal. In other places it is assumed that 594 # unprotected reads and writes to _aborted are serializable (it is true 595 # for python) and thus no locking is used. 596 if not self._aborted: 597 logging.warning('Aborting... It can take a while.') 598 self._aborted = True 599 600 def __enter__(self): 601 """Context manager interface.""" 602 assert not self._prev_sig_handlers, self._prev_sig_handlers 603 for s in (signal.SIGINT, signal.SIGTERM): 604 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort()) 605 return self 606 607 def __exit__(self, _exc_type, _exc_value, _traceback): 608 """Context manager interface.""" 609 self.close() 610 while self._prev_sig_handlers: 611 s, h = self._prev_sig_handlers.popitem() 612 signal.signal(s, h) 613 return False 614 615 def upload_items(self, items, verify_push=False): 616 """Uploads a generator of Item to the isolate server. 617 618 It figures out what items are missing from the server and uploads only them. 619 620 It uses 3 threads internally: 621 - One to create batches based on a timeout 622 - One to dispatch the /contains RPC and field the missing entries 623 - One to field the /push RPC 624 625 The main threads enumerates 'items' and pushes to the first thread. Then it 626 join() all the threads, waiting for them to complete. 627 628 (enumerate items of Item, this can be slow as disk is traversed) 629 | 630 v 631 _create_items_batches_thread Thread #1 632 (generates list(Item), every 3s or 20~100 items) 633 | 634 v 635 _do_lookups_thread Thread #2 636 | | 637 v v 638 (missing) (was on server) 639 | 640 v 641 _handle_missing_thread Thread #3 642 | 643 v 644 (upload Item, append to uploaded) 645 646 Arguments: 647 items: list of isolate_storage.Item instances that represents data to 648 upload. 649 verify_push: verify files are uploaded correctly by fetching from server. 650 651 Returns: 652 List of items that were uploaded. All other items are already there. 653 654 Raises: 655 The first exception being raised in the worker threads. 656 """ 657 incoming = Queue.Queue() 658 batches_to_lookup = Queue.Queue() 659 missing = Queue.Queue() 660 uploaded = [] 661 exc_channel = threading_utils.TaskChannel() 662 663 def _create_items_batches_thread(): 664 """Creates batches for /contains RPC lookup from individual items. 665 666 Input: incoming 667 Output: batches_to_lookup 668 """ 669 try: 670 batch_size_index = 0 671 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index] 672 batch = [] 673 while not self._aborted: 674 try: 675 item = incoming.get(True, timeout=3) 676 if item: 677 batch.append(item) 678 except Queue.Empty: 679 item = False 680 if len(batch) == batch_size or (not item and batch): 681 if len(batch) == batch_size: 682 batch_size_index += 1 683 batch_size = ITEMS_PER_CONTAINS_QUERIES[ 684 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)] 685 batches_to_lookup.put(batch) 686 batch = [] 687 if item is None: 688 break 689 except Exception: 690 exc_channel.send_exception() 691 finally: 692 # Unblock the next pipeline. 693 batches_to_lookup.put(None) 694 695 def _do_lookups_thread(): 696 """Enqueues all the /contains RPCs and emits the missing items. 697 698 Input: batches_to_lookup 699 Output: missing, to_upload 700 """ 701 try: 702 channel = threading_utils.TaskChannel() 703 def _contains(b): 704 if self._aborted: 705 raise Aborted() 706 return self._storage_api.contains(b) 707 708 pending_contains = 0 709 while not self._aborted: 710 batch = batches_to_lookup.get() 711 if batch is None: 712 break 713 self.net_thread_pool.add_task_with_channel( 714 channel, threading_utils.PRIORITY_HIGH, _contains, batch) 715 pending_contains += 1 716 while pending_contains and not self._aborted: 717 try: 718 v = channel.next(timeout=0) 719 except threading_utils.TaskChannel.Timeout: 720 break 721 pending_contains -= 1 722 for missing_item, push_state in v.items(): 723 missing.put((missing_item, push_state)) 724 while pending_contains and not self._aborted: 725 for missing_item, push_state in channel.next().items(): 726 missing.put((missing_item, push_state)) 727 pending_contains -= 1 728 except Exception: 729 exc_channel.send_exception() 730 finally: 731 # Unblock the next pipeline. 732 missing.put((None, None)) 733 734 def _handle_missing_thread(): 735 """Sends the missing items to the uploader. 736 737 Input: missing 738 Output: uploaded 739 """ 740 try: 741 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 742 channel = threading_utils.TaskChannel() 743 pending_upload = 0 744 while not self._aborted: 745 try: 746 missing_item, push_state = missing.get(True, timeout=5) 747 if missing_item is None: 748 break 749 self._async_push(channel, missing_item, push_state, verify_push) 750 pending_upload += 1 751 except Queue.Empty: 752 pass 753 detector.ping() 754 while not self._aborted and pending_upload: 755 try: 756 item = channel.next(timeout=0) 757 except threading_utils.TaskChannel.Timeout: 758 break 759 uploaded.append(item) 760 pending_upload -= 1 761 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded), 762 pending_upload, item.digest, item.size) 763 while not self._aborted and pending_upload: 764 item = channel.next() 765 uploaded.append(item) 766 pending_upload -= 1 767 logging.debug( 768 'Uploaded %d; %d pending: %s (%d)', 769 len(uploaded), pending_upload, item.digest, item.size) 770 except Exception: 771 exc_channel.send_exception() 772 773 threads = [ 774 threading.Thread(target=_create_items_batches_thread), 775 threading.Thread(target=_do_lookups_thread), 776 threading.Thread(target=_handle_missing_thread), 777 ] 778 for t in threads: 779 t.start() 780 781 try: 782 # For each digest keep only first isolate_storage.Item that matches it. 783 # All other items are just indistinguishable copies from the point of view 784 # of isolate server (it doesn't care about paths at all, only content and 785 # digests). 786 seen = {} 787 try: 788 # TODO(maruel): Reorder the items as a priority queue, with larger items 789 # being processed first. This is, before hashing the data. 790 # This must be done in the primary thread since items can be a 791 # generator. 792 for item in items: 793 if seen.setdefault(item.digest, item) is item: 794 incoming.put(item) 795 finally: 796 incoming.put(None) 797 finally: 798 for t in threads: 799 t.join() 800 exc_channel.send_done() 801 for _ in exc_channel: 802 # If there is no exception, this loop does nothing. Otherwise, it raises 803 # the first exception put onto |exc_channel|. 804 pass 805 806 logging.info('All %s files are uploaded', len(uploaded)) 807 if seen: 808 _print_upload_stats(seen.values(), uploaded) 809 return uploaded 810 811 def _async_push(self, channel, item, push_state, verify_push=False): 812 """Starts asynchronous push to the server in a parallel thread. 813 814 Can be used only after |item| was checked for presence on a server with a 815 /contains RPC. 816 817 Arguments: 818 channel: TaskChannel that receives back |item| when upload ends. 819 item: item to upload as instance of isolate_storage.Item class. 820 push_state: push state returned by storage_api.contains(). It contains 821 storage specific information describing how to upload the item (for 822 example in case of cloud storage, it is signed upload URLs). 823 verify_push: verify files are uploaded correctly by fetching from server. 824 825 Returns: 826 None, but |channel| later receives back |item| when upload ends. 827 """ 828 # Thread pool task priority. 829 priority = ( 830 threading_utils.PRIORITY_HIGH if item.high_priority 831 else threading_utils.PRIORITY_MED) 832 833 def _push(content): 834 """Pushes an isolate_storage.Item and returns it to |channel|.""" 835 if self._aborted: 836 raise Aborted() 837 self._storage_api.push(item, push_state, content) 838 if verify_push: 839 try: 840 self._fetch( 841 item.digest, 842 item.size, 843 # this consumes all elements from given generator. 844 lambda gen: collections.deque(gen, maxlen=0)) 845 except Exception: 846 # reset push_state if failed to verify. 847 push_state.finalized = False 848 push_state.uploaded = False 849 raise 850 851 return item 852 853 # If zipping is not required, just start a push task. Don't pass 'content' 854 # so that it can create a new generator when it retries on failures. 855 if not self.server_ref.is_with_compression: 856 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None) 857 return 858 859 # If zipping is enabled, zip in a separate thread. 860 def zip_and_push(): 861 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble 862 # content right here. It will block until all file is zipped. 863 try: 864 if self._aborted: 865 raise Aborted() 866 stream = zip_compress(item.content(), item.compression_level) 867 # In Python3, zlib.compress returns a byte object instead of str. 868 data = six.b('').join(stream) 869 except Exception as exc: 870 logging.error('Failed to zip \'%s\': %s', item, exc) 871 channel.send_exception() 872 return 873 # Pass '[data]' explicitly because the compressed data is not same as the 874 # one provided by 'item'. Since '[data]' is a list, it can safely be 875 # reused during retries. 876 self.net_thread_pool.add_task_with_channel( 877 channel, priority, _push, [data]) 878 self.cpu_thread_pool.add_task(priority, zip_and_push) 879 880 def push(self, item, push_state): 881 """Synchronously pushes a single item to the server. 882 883 If you need to push many items at once, consider using 'upload_items' or 884 '_async_push' with instance of TaskChannel. 885 886 Arguments: 887 item: item to upload as instance of isolate_storage.Item class. 888 push_state: push state returned by storage_api.contains(). It contains 889 storage specific information describing how to upload the item (for 890 example in case of cloud storage, it is signed upload URLs). 891 892 Returns: 893 Pushed item (same object as |item|). 894 """ 895 channel = threading_utils.TaskChannel() 896 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT): 897 self._async_push(channel, item, push_state) 898 pushed = channel.next() 899 assert pushed is item 900 return item 901 902 def _fetch(self, digest, size, sink): 903 try: 904 # Prepare reading pipeline. 905 stream = self._storage_api.fetch(digest, size, 0) 906 if self.server_ref.is_with_compression: 907 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK) 908 # Run |stream| through verifier that will assert its size. 909 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest, 910 size) 911 # Verified stream goes to |sink|. 912 sink(verifier.run()) 913 except Exception: 914 logging.exception('Failed to fetch %s', digest) 915 raise 916 917 def async_fetch(self, channel, priority, digest, size, sink): 918 """Starts asynchronous fetch from the server in a parallel thread. 919 920 Arguments: 921 channel: TaskChannel that receives back |digest| when download ends. 922 priority: thread pool task priority for the fetch. 923 digest: hex digest of an item to download. 924 size: expected size of the item (after decompression). 925 sink: function that will be called as sink(generator). 926 """ 927 def fetch(): 928 self._fetch(digest, size, sink) 929 return digest 930 931 # Don't bother with zip_thread_pool for decompression. Decompression is 932 # really fast and most probably IO bound anyway. 933 self.net_thread_pool.add_task_with_channel(channel, priority, fetch) 934 935 936class FetchQueue(object): 937 """Fetches items from Storage and places them into ContentAddressedCache. 938 939 It manages multiple concurrent fetch operations. Acts as a bridge between 940 Storage and ContentAddressedCache so that Storage and ContentAddressedCache 941 don't depend on each other at all. 942 """ 943 944 def __init__(self, storage, cache): 945 self.storage = storage 946 self.cache = cache 947 self._channel = threading_utils.TaskChannel() 948 self._pending = set() 949 self._accessed = set() 950 self._fetched = set(cache) 951 # Pending digests that the caller waits for, see wait_on()/wait(). 952 self._waiting_on = set() 953 # Already fetched digests the caller waits for which are not yet returned by 954 # wait(). 955 self._waiting_on_ready = set() 956 957 def add( 958 self, 959 digest, 960 size=local_caching.UNKNOWN_FILE_SIZE, 961 priority=threading_utils.PRIORITY_MED): 962 """Starts asynchronous fetch of item |digest|.""" 963 # Fetching it now? 964 if digest in self._pending: 965 return 966 967 # Mark this file as in use, verify_all_cached will later ensure it is still 968 # in cache. 969 self._accessed.add(digest) 970 971 # Already fetched? Notify cache to update item's LRU position. 972 if digest in self._fetched: 973 # 'touch' returns True if item is in cache and not corrupted. 974 if self.cache.touch(digest, size): 975 return 976 logging.error('%s is corrupted', digest) 977 self._fetched.remove(digest) 978 979 # TODO(maruel): It should look at the free disk space, the current cache 980 # size and the size of the new item on every new item: 981 # - Trim the cache as more entries are listed when free disk space is low, 982 # otherwise if the amount of data downloaded during the run > free disk 983 # space, it'll crash. 984 # - Make sure there's enough free disk space to fit all dependencies of 985 # this run! If not, abort early. 986 987 # Start fetching. 988 self._pending.add(digest) 989 self.storage.async_fetch( 990 self._channel, priority, digest, size, 991 functools.partial(self.cache.write, digest)) 992 993 def wait_on(self, digest): 994 """Updates digests to be waited on by 'wait'.""" 995 # Calculate once the already fetched items. These will be retrieved first. 996 if digest in self._fetched: 997 self._waiting_on_ready.add(digest) 998 else: 999 self._waiting_on.add(digest) 1000 1001 def wait(self): 1002 """Waits until any of waited-on items is retrieved. 1003 1004 Once this happens, it is remove from the waited-on set and returned. 1005 1006 This function is called in two waves. The first wave it is done for HIGH 1007 priority items, the isolated files themselves. The second wave it is called 1008 for all the files. 1009 1010 If the waited-on set is empty, raises RuntimeError. 1011 """ 1012 # Flush any already fetched items. 1013 if self._waiting_on_ready: 1014 return self._waiting_on_ready.pop() 1015 1016 assert self._waiting_on, 'Needs items to wait on' 1017 1018 # Wait for one waited-on item to be fetched. 1019 while self._pending: 1020 digest = self._channel.next() 1021 self._pending.remove(digest) 1022 self._fetched.add(digest) 1023 if digest in self._waiting_on: 1024 self._waiting_on.remove(digest) 1025 return digest 1026 1027 # Should never reach this point due to assert above. 1028 raise RuntimeError('Impossible state') 1029 1030 @property 1031 def wait_queue_empty(self): 1032 """Returns True if there is no digest left for wait() to return.""" 1033 return not self._waiting_on and not self._waiting_on_ready 1034 1035 def inject_local_file(self, path, algo): 1036 """Adds local file to the cache as if it was fetched from storage.""" 1037 with fs.open(path, 'rb') as f: 1038 data = f.read() 1039 digest = algo(data).hexdigest() 1040 self.cache.write(digest, [data]) 1041 self._fetched.add(digest) 1042 return digest 1043 1044 @property 1045 def pending_count(self): 1046 """Returns number of items to be fetched.""" 1047 return len(self._pending) 1048 1049 def verify_all_cached(self): 1050 """True if all accessed items are in cache.""" 1051 # Not thread safe, but called after all work is done. 1052 return self._accessed.issubset(self.cache) 1053 1054 1055class FetchStreamVerifier(object): 1056 """Verifies that fetched file is valid before passing it to the 1057 ContentAddressedCache. 1058 """ 1059 1060 def __init__(self, stream, hasher, expected_digest, expected_size): 1061 """Initializes the verifier. 1062 1063 Arguments: 1064 * stream: an iterable yielding chunks of content 1065 * hasher: an object from hashlib that supports update() and hexdigest() 1066 (eg, hashlib.sha1). 1067 * expected_digest: if the entire stream is piped through hasher and then 1068 summarized via hexdigest(), this should be the result. That is, it 1069 should be a hex string like 'abc123'. 1070 * expected_size: either the expected size of the stream, or 1071 local_caching.UNKNOWN_FILE_SIZE. 1072 """ 1073 assert stream is not None 1074 self.stream = stream 1075 self.expected_digest = expected_digest 1076 self.expected_size = expected_size 1077 self.current_size = 0 1078 self.rolling_hash = hasher() 1079 1080 def run(self): 1081 """Generator that yields same items as |stream|. 1082 1083 Verifies |stream| is complete before yielding a last chunk to consumer. 1084 1085 Also wraps IOError produced by consumer into MappingError exceptions since 1086 otherwise Storage will retry fetch on unrelated local cache errors. 1087 """ 1088 # Read one chunk ahead, keep it in |stored|. 1089 # That way a complete stream can be verified before pushing last chunk 1090 # to consumer. 1091 stored = None 1092 for chunk in self.stream: 1093 assert chunk is not None 1094 if stored is not None: 1095 self._inspect_chunk(stored, is_last=False) 1096 try: 1097 yield stored 1098 except IOError as exc: 1099 raise isolated_format.MappingError( 1100 'Failed to store an item in cache: %s' % exc) 1101 stored = chunk 1102 if stored is not None: 1103 self._inspect_chunk(stored, is_last=True) 1104 try: 1105 yield stored 1106 except IOError as exc: 1107 raise isolated_format.MappingError( 1108 'Failed to store an item in cache: %s' % exc) 1109 1110 def _inspect_chunk(self, chunk, is_last): 1111 """Called for each fetched chunk before passing it to consumer.""" 1112 self.current_size += len(chunk) 1113 self.rolling_hash.update(chunk) 1114 if not is_last: 1115 return 1116 1117 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and 1118 (self.expected_size != self.current_size)): 1119 msg = 'Incorrect file size: want %d, got %d' % ( 1120 self.expected_size, self.current_size) 1121 raise IOError(msg) 1122 1123 actual_digest = self.rolling_hash.hexdigest() 1124 if self.expected_digest != actual_digest: 1125 msg = 'Incorrect digest: want %s, got %s' % ( 1126 self.expected_digest, actual_digest) 1127 raise IOError(msg) 1128 1129 1130class IsolatedBundle(object): 1131 """Fetched and parsed .isolated file with all dependencies.""" 1132 1133 def __init__(self, filter_cb): 1134 """ 1135 filter_cb: callback function to filter downloaded content. 1136 When filter_cb is not None, Isolated file is downloaded iff 1137 filter_cb(filepath) returns True. 1138 """ 1139 1140 self.command = [] 1141 self.files = {} 1142 self.relative_cwd = None 1143 # The main .isolated file, a IsolatedFile instance. 1144 self.root = None 1145 1146 self._filter_cb = filter_cb 1147 1148 def fetch(self, fetch_queue, root_isolated_hash, algo): 1149 """Fetches the .isolated and all the included .isolated. 1150 1151 It enables support for "included" .isolated files. They are processed in 1152 strict order but fetched asynchronously from the cache. This is important so 1153 that a file in an included .isolated file that is overridden by an embedding 1154 .isolated file is not fetched needlessly. The includes are fetched in one 1155 pass and the files are fetched as soon as all the ones on the left-side 1156 of the tree were fetched. 1157 1158 The prioritization is very important here for nested .isolated files. 1159 'includes' have the highest priority and the algorithm is optimized for both 1160 deep and wide trees. A deep one is a long link of .isolated files referenced 1161 one at a time by one item in 'includes'. A wide one has a large number of 1162 'includes' in a single .isolated file. 'left' is defined as an included 1163 .isolated file earlier in the 'includes' list. So the order of the elements 1164 in 'includes' is important. 1165 1166 As a side effect this method starts asynchronous fetch of all data files 1167 by adding them to |fetch_queue|. It doesn't wait for data files to finish 1168 fetching though. 1169 """ 1170 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo) 1171 1172 # Isolated files being retrieved now: hash -> IsolatedFile instance. 1173 pending = {} 1174 # Set of hashes of already retrieved items to refuse recursive includes. 1175 seen = set() 1176 # Set of IsolatedFile's whose data files have already being fetched. 1177 processed = set() 1178 1179 def retrieve_async(isolated_file): 1180 """Retrieves an isolated file included by the root bundle.""" 1181 h = isolated_file.obj_hash 1182 if h in seen: 1183 raise isolated_format.IsolatedError( 1184 'IsolatedFile %s is retrieved recursively' % h) 1185 assert h not in pending 1186 seen.add(h) 1187 pending[h] = isolated_file 1188 # This isolated item is being added dynamically, notify FetchQueue. 1189 fetch_queue.wait_on(h) 1190 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH) 1191 1192 # Start fetching root *.isolated file (single file, not the whole bundle). 1193 retrieve_async(self.root) 1194 1195 while pending: 1196 # Wait until some *.isolated file is fetched, parse it. 1197 item_hash = fetch_queue.wait() 1198 item = pending.pop(item_hash) 1199 with fetch_queue.cache.getfileobj(item_hash) as f: 1200 item.load(f.read()) 1201 1202 # Start fetching included *.isolated files. 1203 for new_child in item.children: 1204 retrieve_async(new_child) 1205 1206 # Always fetch *.isolated files in traversal order, waiting if necessary 1207 # until next to-be-processed node loads. "Waiting" is done by yielding 1208 # back to the outer loop, that waits until some *.isolated is loaded. 1209 for node in isolated_format.walk_includes(self.root): 1210 if node not in processed: 1211 # Not visited, and not yet loaded -> wait for it to load. 1212 if not node.is_loaded: 1213 break 1214 # Not visited and loaded -> process it and continue the traversal. 1215 self._start_fetching_files(node, fetch_queue) 1216 processed.add(node) 1217 1218 # All *.isolated files should be processed by now and only them. 1219 all_isolateds = set(isolated_format.walk_includes(self.root)) 1220 assert all_isolateds == processed, (all_isolateds, processed) 1221 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied' 1222 1223 # Extract 'command' and other bundle properties. 1224 for node in isolated_format.walk_includes(self.root): 1225 self._update_self(node) 1226 self.relative_cwd = self.relative_cwd or '' 1227 1228 def _start_fetching_files(self, isolated, fetch_queue): 1229 """Starts fetching files from |isolated| that are not yet being fetched. 1230 1231 Modifies self.files. 1232 """ 1233 files = isolated.data.get('files', {}) 1234 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files)) 1235 for filepath, properties in files.items(): 1236 if self._filter_cb and not self._filter_cb(filepath): 1237 continue 1238 1239 # Root isolated has priority on the files being mapped. In particular, 1240 # overridden files must not be fetched. 1241 if filepath not in self.files: 1242 self.files[filepath] = properties 1243 1244 # Preemptively request hashed files. 1245 if 'h' in properties: 1246 fetch_queue.add( 1247 properties['h'], properties['s'], threading_utils.PRIORITY_MED) 1248 1249 def _update_self(self, node): 1250 """Extracts bundle global parameters from loaded *.isolated file. 1251 1252 Will be called with each loaded *.isolated file in order of traversal of 1253 isolated include graph (see isolated_format.walk_includes). 1254 """ 1255 # Grabs properties. 1256 if not self.command and node.data.get('command'): 1257 # Ensure paths are correctly separated on windows. 1258 self.command = node.data['command'] 1259 if self.command: 1260 self.command[0] = self.command[0].replace('/', os.path.sep) 1261 if (self.relative_cwd is None and 1262 node.data.get('relative_cwd') is not None): 1263 self.relative_cwd = node.data['relative_cwd'] 1264 1265 1266def get_storage(server_ref): 1267 """Returns Storage class that can upload and download from |namespace|. 1268 1269 Arguments: 1270 server_ref: isolate_storage.ServerRef instance. 1271 1272 Returns: 1273 Instance of Storage. 1274 """ 1275 # Handle the specific internal use case. 1276 assert (isinstance(server_ref, isolate_storage.ServerRef) or 1277 type(server_ref).__name__ == 'ServerRef'), repr(server_ref) 1278 return Storage(isolate_storage.get_storage_api(server_ref)) 1279 1280 1281def _map_file(dst, digest, props, cache, use_symlinks): 1282 """Put downloaded file to destination path. This function is used for multi 1283 threaded file putting. 1284 """ 1285 with tools.Profiler("_map_file for %s" % dst): 1286 with cache.getfileobj(digest) as srcfileobj: 1287 filetype = props.get('t', 'basic') 1288 1289 if filetype == 'basic': 1290 # Ignore all bits apart from the user. 1291 file_mode = (props.get('m') or 0o500) & 0o700 1292 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks) 1293 1294 elif filetype == 'tar': 1295 basedir = os.path.dirname(dst) 1296 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t: 1297 ensured_dirs = set() 1298 for ti in t: 1299 if not ti.isfile(): 1300 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name, 1301 ti.type) 1302 continue 1303 # Handle files created on Windows fetched on POSIX and the 1304 # reverse. 1305 other_sep = '/' if os.path.sep == '\\' else '\\' 1306 name = ti.name.replace(other_sep, os.path.sep) 1307 fp = os.path.normpath(os.path.join(basedir, name)) 1308 if not fp.startswith(basedir): 1309 logging.error('Path(%r) is outside root directory', fp) 1310 ifd = t.extractfile(ti) 1311 fp_dir = os.path.dirname(fp) 1312 if fp_dir not in ensured_dirs: 1313 file_path.ensure_tree(fp_dir) 1314 ensured_dirs.add(fp_dir) 1315 file_mode = ti.mode & 0o700 1316 putfile(ifd, fp, file_mode, ti.size) 1317 1318 else: 1319 raise isolated_format.IsolatedError('Unknown file type %r' % filetype) 1320 1321 1322def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks, 1323 filter_cb=None): 1324 """Aggressively downloads the .isolated file(s), then download all the files. 1325 1326 Arguments: 1327 isolated_hash: hash of the root *.isolated file. 1328 storage: Storage class that communicates with isolate storage. 1329 cache: ContentAddressedCache class that knows how to store and map files 1330 locally. 1331 outdir: Output directory to map file tree to. 1332 use_symlinks: Use symlinks instead of hardlinks when True. 1333 filter_cb: filter that works as allowlist for downloaded files. 1334 1335 Returns: 1336 IsolatedBundle object that holds details about loaded *.isolated file. 1337 """ 1338 logging.debug( 1339 'fetch_isolated(%s, %s, %s, %s, %s)', 1340 isolated_hash, storage, cache, outdir, use_symlinks) 1341 # Hash algorithm to use, defined by namespace |storage| is using. 1342 algo = storage.server_ref.hash_algo 1343 fetch_queue = FetchQueue(storage, cache) 1344 bundle = IsolatedBundle(filter_cb) 1345 1346 with tools.Profiler('GetIsolateds'): 1347 # Optionally support local files by manually adding them to cache. 1348 if not isolated_format.is_valid_hash(isolated_hash, algo): 1349 logging.debug( 1350 '%s is not a valid hash, assuming a file ' 1351 '(algo was %s, hash size was %d)', isolated_hash, algo(), 1352 algo().digest_size) 1353 path = six.text_type(os.path.abspath(isolated_hash)) 1354 try: 1355 isolated_hash = fetch_queue.inject_local_file(path, algo) 1356 except IOError as e: 1357 raise isolated_format.MappingError( 1358 '%s doesn\'t seem to be a valid file. Did you intent to pass a ' 1359 'valid hash (error: %s)?' % (isolated_hash, e)) 1360 1361 # Load all *.isolated and start loading rest of the files. 1362 bundle.fetch(fetch_queue, isolated_hash, algo) 1363 1364 with tools.Profiler('GetRest'): 1365 # Create file system hierarchy. 1366 file_path.ensure_tree(outdir) 1367 create_directories(outdir, bundle.files) 1368 _create_symlinks(outdir, bundle.files.items()) 1369 1370 # Ensure working directory exists. 1371 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd)) 1372 file_path.ensure_tree(cwd) 1373 1374 # Multimap: digest -> list of pairs (path, props). 1375 remaining = {} 1376 for filepath, props in bundle.files.items(): 1377 if 'h' in props: 1378 remaining.setdefault(props['h'], []).append((filepath, props)) 1379 fetch_queue.wait_on(props['h']) 1380 1381 # Now block on the remaining files to be downloaded and mapped. 1382 logging.info('Retrieving remaining files (%d of them)...', 1383 fetch_queue.pending_count) 1384 last_update = time.time() 1385 1386 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool: 1387 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector: 1388 while remaining: 1389 detector.ping() 1390 1391 # Wait for any item to finish fetching to cache. 1392 digest = fetch_queue.wait() 1393 1394 # Create the files in the destination using item in cache as the 1395 # source. 1396 for filepath, props in remaining.pop(digest): 1397 fullpath = os.path.join(outdir, filepath) 1398 1399 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH, 1400 _map_file, fullpath, digest, props, 1401 cache, use_symlinks) 1402 1403 # Report progress. 1404 duration = time.time() - last_update 1405 if duration > DELAY_BETWEEN_UPDATES_IN_SECS: 1406 msg = '%d files remaining...' % len(remaining) 1407 sys.stdout.write(msg + '\n') 1408 sys.stdout.flush() 1409 logging.info(msg) 1410 last_update = time.time() 1411 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied' 1412 putfile_thread_pool.join() 1413 1414 # Save the cache right away to not loose the state of the new objects. 1415 cache.save() 1416 # Cache could evict some items we just tried to fetch, it's a fatal error. 1417 if not fetch_queue.verify_all_cached(): 1418 free_disk = file_path.get_free_space(cache.cache_dir) 1419 msg = ( 1420 'Cache is too small to hold all requested files.\n' 1421 ' %s\n cache=%dbytes, %d items; %sb free_space') % ( 1422 cache.policies, cache.total_size, len(cache), free_disk) 1423 raise isolated_format.MappingError(msg) 1424 return bundle 1425 1426 1427def _directory_to_metadata(root, algo, denylist): 1428 """Yields every file and/or symlink found. 1429 1430 Yields: 1431 tuple(FileItem, relpath, metadata) 1432 For a symlink, FileItem is None. 1433 """ 1434 # Current tar file bundle, if any. 1435 root = file_path.get_native_path_case(root) 1436 bundle = TarBundle(root, algo) 1437 for relpath, issymlink in isolated_format.expand_directory_and_symlink( 1438 root, 1439 u'.' + os.path.sep, 1440 denylist, 1441 follow_symlinks=(sys.platform != 'win32')): 1442 1443 filepath = os.path.join(root, relpath) 1444 if issymlink: 1445 # TODO(maruel): Do not call this. 1446 meta = isolated_format.file_to_metadata(filepath, False) 1447 yield None, relpath, meta 1448 continue 1449 1450 prio = relpath.endswith('.isolated') 1451 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)): 1452 # The file was added to the current pending tarball and won't be archived 1453 # individually. 1454 continue 1455 1456 # Flush and reset the bundle. 1457 for i, p, m in bundle.yield_item_path_meta(): 1458 yield i, p, m 1459 bundle = TarBundle(root, algo) 1460 1461 # Yield the file individually. 1462 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio) 1463 yield item, relpath, item.meta 1464 1465 for i, p, m in bundle.yield_item_path_meta(): 1466 yield i, p, m 1467 1468 1469def _print_upload_stats(items, missing): 1470 """Prints upload stats.""" 1471 total = len(items) 1472 total_size = sum(f.size for f in items) 1473 logging.info( 1474 'Total: %6d, %9.1fkiB', total, total_size / 1024.) 1475 cache_hit = set(items).difference(missing) 1476 cache_hit_size = sum(f.size for f in cache_hit) 1477 logging.info( 1478 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size', 1479 len(cache_hit), 1480 cache_hit_size / 1024., 1481 len(cache_hit) * 100. / total, 1482 cache_hit_size * 100. / total_size if total_size else 0) 1483 cache_miss = missing 1484 cache_miss_size = sum(f.size for f in cache_miss) 1485 logging.info('cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size', 1486 len(cache_miss), cache_miss_size / 1024., 1487 len(cache_miss) * 100. / total, 1488 cache_miss_size * 100. / total_size if total_size else 0) 1489 1490 1491def _enqueue_dir(dirpath, denylist, hash_algo, hash_algo_name): 1492 """Called by archive_files_to_storage for a directory. 1493 1494 Create an .isolated file. 1495 1496 Yields: 1497 FileItem for every file found, plus one for the .isolated file itself. 1498 """ 1499 files = {} 1500 for item, relpath, meta in _directory_to_metadata(dirpath, hash_algo, 1501 denylist): 1502 # item is None for a symlink. 1503 files[relpath] = meta 1504 if item: 1505 yield item 1506 1507 # TODO(maruel): If there' not file, don't yield an .isolated file. 1508 data = { 1509 'algo': hash_algo_name, 1510 'files': files, 1511 'version': isolated_format.ISOLATED_FILE_VERSION, 1512 } 1513 # Keep the file in memory. This is fine because .isolated files are relatively 1514 # small. 1515 yield BufferItem( 1516 tools.format_json(data, True).encode(), 1517 algo=hash_algo, 1518 high_priority=True) 1519 1520 1521def _archive_files_to_storage_internal(storage, 1522 files, 1523 denylist, 1524 verify_push=False): 1525 """Stores every entry into remote storage and returns stats. 1526 1527 Arguments: 1528 storage: a Storage object that communicates with the remote object store. 1529 files: iterable of files to upload. If a directory is specified (with a 1530 trailing slash), a .isolated file is created and its hash is returned. 1531 Duplicates are skipped. 1532 denylist: function that returns True if a file should be omitted. 1533 verify_push: verify files are uploaded correctly by fetching from server. 1534 1535 Returns: 1536 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)). 1537 The first file in the first item is always the .isolated file. 1538 1539 Raises: 1540 Re-raises the exception in upload_items(), if there is any. 1541 """ 1542 # Dict of path to hash. 1543 results = collections.OrderedDict() 1544 hash_algo = storage.server_ref.hash_algo 1545 hash_algo_name = storage.server_ref.hash_algo_name 1546 # Generator of FileItem to pass to upload_items() concurrent operation. 1547 channel = threading_utils.TaskChannel() 1548 exc_channel = threading_utils.TaskChannel() 1549 uploaded_digests = set() 1550 1551 def _upload_items(): 1552 try: 1553 results = storage.upload_items(channel, verify_push) 1554 uploaded_digests.update(f.digest for f in results) 1555 except Exception: 1556 exc_channel.send_exception() 1557 1558 t = threading.Thread(target=_upload_items) 1559 t.start() 1560 1561 # Keep track locally of the items to determine cold and hot items. 1562 items_found = [] 1563 try: 1564 for f in files: 1565 assert isinstance(f, six.text_type), repr(f) 1566 if f in results: 1567 # Duplicate 1568 continue 1569 try: 1570 filepath = os.path.abspath(f) 1571 if fs.isdir(filepath): 1572 # Uploading a whole directory. 1573 item = None 1574 for item in _enqueue_dir(filepath, denylist, hash_algo, 1575 hash_algo_name): 1576 channel.send_result(item) 1577 items_found.append(item) 1578 # The very last item will be the .isolated file. 1579 if not item: 1580 # There was no file in the directory. 1581 continue 1582 elif fs.isfile(filepath): 1583 item = FileItem( 1584 path=filepath, 1585 algo=hash_algo, 1586 size=None, 1587 high_priority=f.endswith('.isolated')) 1588 channel.send_result(item) 1589 items_found.append(item) 1590 else: 1591 raise Error('%s is neither a file or directory.' % f) 1592 results[f] = item.digest 1593 except OSError: 1594 raise Error('Failed to process %s.' % f) 1595 finally: 1596 # Stops the generator, so _upload_items() can exit. 1597 channel.send_done() 1598 t.join() 1599 exc_channel.send_done() 1600 1601 try: 1602 for _ in exc_channel: 1603 pass 1604 except Exception: 1605 # log items when failed to upload files. 1606 for item in items_found: 1607 if isinstance(item, FileItem): 1608 logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s', 1609 item.path, item.digest, 1610 isolated_format.hash_file(item.path, item.algo)) 1611 continue 1612 1613 logging.error('Item digest:%s', item.digest) 1614 1615 raise 1616 1617 cold = [] 1618 hot = [] 1619 for i in items_found: 1620 # Note that multiple FileItem may have the same .digest. 1621 if i.digest in uploaded_digests: 1622 cold.append(i) 1623 else: 1624 hot.append(i) 1625 return results, cold, hot 1626 1627 1628# TODO(crbug.com/1073832): 1629# remove this if process leak in coverage build was fixed. 1630def archive_files_to_storage(storage, files, denylist, verify_push=False): 1631 """Calls _archive_files_to_storage_internal with retry. 1632 1633 Arguments: 1634 See Arguments section in _archive_files_to_storage_internal 1635 1636 Returns: 1637 See Returns section in _archive_files_to_storage_internal 1638 1639 Raises: 1640 Re-raises the exception in _archive_files_to_storage_internal if all retry 1641 failed. 1642 """ 1643 1644 # Will do exponential backoff. 1645 # e.g. 10, 20, 40, 80 1646 backoff = 10 1647 1648 while True: 1649 try: 1650 return _archive_files_to_storage_internal(storage, files, denylist, 1651 verify_push) 1652 except Exception: 1653 if backoff > 100: 1654 raise 1655 1656 on_error.report('error before %d second backoff' % backoff) 1657 1658 logging.exception( 1659 'failed to run _archive_files_to_storage_internal,' 1660 ' will retry after %d seconds', backoff) 1661 time.sleep(backoff) 1662 backoff *= 2 1663 1664 1665@subcommand.usage('<file1..fileN> or - to read from stdin') 1666def CMDarchive(parser, args): 1667 """Archives data to the server. 1668 1669 If a directory is specified, a .isolated file is created the whole directory 1670 is uploaded. Then this .isolated file can be included in another one to run 1671 commands. 1672 1673 The commands output each file that was processed with its content hash. For 1674 directories, the .isolated generated for the directory is listed as the 1675 directory entry itself. 1676 """ 1677 add_isolate_server_options(parser) 1678 add_archive_options(parser) 1679 options, files = parser.parse_args(args) 1680 process_isolate_server_options(parser, options, True) 1681 server_ref = isolate_storage.ServerRef( 1682 options.isolate_server, options.namespace) 1683 if files == ['-']: 1684 files = (l.rstrip('\n\r') for l in sys.stdin) 1685 if not files: 1686 parser.error('Nothing to upload') 1687 files = (six.ensure_text(f) for f in files) 1688 denylist = tools.gen_denylist(options.blacklist) 1689 try: 1690 with get_storage(server_ref) as storage: 1691 results, _cold, _hot = archive_files_to_storage(storage, files, denylist) 1692 except (Error, local_caching.NoMoreSpace) as e: 1693 parser.error(e.args[0]) 1694 print('\n'.join('%s %s' % (h, f) for f, h in results.items())) 1695 return 0 1696 1697 1698def CMDdownload(parser, args): 1699 """Download data from the server. 1700 1701 It can either download individual files or a complete tree from a .isolated 1702 file. 1703 """ 1704 add_isolate_server_options(parser) 1705 parser.add_option( 1706 '-s', '--isolated', metavar='HASH', 1707 help='hash of an isolated file, .isolated file content is discarded, use ' 1708 '--file if you need it') 1709 parser.add_option( 1710 '-f', 1711 '--file', 1712 metavar='HASH DEST', 1713 default=[], 1714 action='append', 1715 nargs=2, 1716 help='hash and destination of a file, can be used multiple times') 1717 parser.add_option( 1718 '-t', 1719 '--target', 1720 metavar='DIR', 1721 default='download', 1722 help='destination directory') 1723 parser.add_option( 1724 '--use-symlinks', 1725 action='store_true', 1726 help='Use symlinks instead of hardlinks') 1727 add_cache_options(parser) 1728 options, args = parser.parse_args(args) 1729 if args: 1730 parser.error('Unsupported arguments: %s' % args) 1731 if not file_path.enable_symlink(): 1732 logging.warning('Symlink support is not enabled') 1733 1734 process_isolate_server_options(parser, options, True) 1735 if bool(options.isolated) == bool(options.file): 1736 parser.error('Use one of --isolated or --file, and only one.') 1737 if not options.cache and options.use_symlinks: 1738 parser.error('--use-symlinks require the use of a cache with --cache') 1739 1740 cache = process_cache_options(options, trim=True) 1741 cache.cleanup() 1742 options.target = six.text_type(os.path.abspath(options.target)) 1743 if options.isolated: 1744 if (fs.isfile(options.target) or 1745 (fs.isdir(options.target) and fs.listdir(options.target))): 1746 parser.error( 1747 '--target \'%s\' exists, please use another target' % options.target) 1748 server_ref = isolate_storage.ServerRef( 1749 options.isolate_server, options.namespace) 1750 with get_storage(server_ref) as storage: 1751 # Fetching individual files. 1752 if options.file: 1753 # TODO(maruel): Enable cache in this case too. 1754 channel = threading_utils.TaskChannel() 1755 pending = {} 1756 for digest, dest in options.file: 1757 dest = six.text_type(dest) 1758 pending[digest] = dest 1759 storage.async_fetch( 1760 channel, threading_utils.PRIORITY_MED, digest, 1761 local_caching.UNKNOWN_FILE_SIZE, 1762 functools.partial(local_caching.file_write, 1763 os.path.join(options.target, dest))) 1764 while pending: 1765 fetched = channel.next() 1766 dest = pending.pop(fetched) 1767 logging.info('%s: %s', fetched, dest) 1768 1769 # Fetching whole isolated tree. 1770 if options.isolated: 1771 bundle = fetch_isolated( 1772 isolated_hash=options.isolated, 1773 storage=storage, 1774 cache=cache, 1775 outdir=options.target, 1776 use_symlinks=options.use_symlinks) 1777 cache.trim() 1778 if bundle.command: 1779 rel = os.path.join(options.target, bundle.relative_cwd) 1780 print('To run this test please run from the directory %s:' % 1781 os.path.join(options.target, rel)) 1782 print(' ' + ' '.join(bundle.command)) 1783 1784 return 0 1785 1786 1787def add_archive_options(parser): 1788 parser.add_option( 1789 '--blacklist', 1790 action='append', 1791 default=list(DEFAULT_DENYLIST), 1792 help='List of regexp to use as denylist filter when uploading ' 1793 'directories') 1794 1795 1796def add_isolate_server_options(parser): 1797 """Adds --isolate-server and --namespace options to parser.""" 1798 parser.add_option( 1799 '-I', '--isolate-server', 1800 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''), 1801 help='URL of the Isolate Server to use. Defaults to the environment ' 1802 'variable ISOLATE_SERVER if set. No need to specify https://, this ' 1803 'is assumed.') 1804 parser.add_option( 1805 '--namespace', 1806 default='default-gzip', 1807 help='The namespace to use on the Isolate Server, default: %default') 1808 1809 1810def process_isolate_server_options(parser, options, required): 1811 """Processes the --isolate-server option. 1812 1813 Returns the identity as determined by the server. 1814 """ 1815 if not options.isolate_server: 1816 if required: 1817 parser.error('--isolate-server is required.') 1818 return 1819 1820 try: 1821 options.isolate_server = net.fix_url(options.isolate_server) 1822 except ValueError as e: 1823 parser.error('--isolate-server %s' % e) 1824 1825 try: 1826 return auth.ensure_logged_in(options.isolate_server) 1827 except ValueError as e: 1828 parser.error(str(e)) 1829 return None 1830 1831 1832def add_cache_options(parser): 1833 cache_group = optparse.OptionGroup(parser, 'Isolated cache management') 1834 cache_group.add_option( 1835 '--cache', metavar='DIR', default='cache', 1836 help='Directory to keep a local cache of the files. Accelerates download ' 1837 'by reusing already downloaded files. Default=%default') 1838 cache_group.add_option( 1839 '--max-cache-size', 1840 type='int', 1841 metavar='NNN', 1842 default=50*1024*1024*1024, 1843 help='Trim if the cache gets larger than this value, default=%default') 1844 cache_group.add_option( 1845 '--min-free-space', 1846 type='int', 1847 metavar='NNN', 1848 default=2*1024*1024*1024, 1849 help='Trim if disk free space becomes lower than this value, ' 1850 'default=%default') 1851 cache_group.add_option( 1852 '--max-items', 1853 type='int', 1854 metavar='NNN', 1855 default=100000, 1856 help='Trim if more than this number of items are in the cache ' 1857 'default=%default') 1858 parser.add_option_group(cache_group) 1859 1860 1861def process_cache_options(options, trim, **kwargs): 1862 if options.cache: 1863 policies = local_caching.CachePolicies( 1864 options.max_cache_size, 1865 options.min_free_space, 1866 options.max_items, 1867 # 3 weeks. 1868 max_age_secs=21 * 24 * 60 * 60) 1869 1870 # |options.cache| path may not exist until DiskContentAddressedCache() 1871 # instance is created. 1872 return local_caching.DiskContentAddressedCache( 1873 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs) 1874 return local_caching.MemoryContentAddressedCache() 1875 1876 1877class OptionParserIsolateServer(logging_utils.OptionParserWithLogging): 1878 1879 def __init__(self, **kwargs): 1880 logging_utils.OptionParserWithLogging.__init__( 1881 self, 1882 version=__version__, 1883 prog=os.path.basename(sys.modules[__name__].__file__), 1884 **kwargs) 1885 auth.add_auth_options(self) 1886 1887 def parse_args(self, *args, **kwargs): 1888 options, args = logging_utils.OptionParserWithLogging.parse_args( 1889 self, *args, **kwargs) 1890 auth.process_auth_options(self, options) 1891 return options, args 1892 1893 1894def main(args): 1895 dispatcher = subcommand.CommandDispatcher(__name__) 1896 return dispatcher.execute(OptionParserIsolateServer(), args) 1897 1898 1899if __name__ == '__main__': 1900 subprocess42.inhibit_os_error_reporting() 1901 fix_encoding.fix_encoding() 1902 tools.disable_buffering() 1903 colorama.init() 1904 net.set_user_agent('isolateserver.py/' + __version__) 1905 sys.exit(main(sys.argv[1:])) 1906