1#!/usr/bin/env python
2# Copyright 2013 The LUCI Authors. All rights reserved.
3# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
5
6"""Archives a set of files or directories to an Isolate Server."""
7
8from __future__ import print_function
9
10import collections
11import errno
12import functools
13import logging
14import optparse
15import os
16import re
17import signal
18import stat
19import sys
20import tarfile
21import threading
22import time
23import zlib
24
25from utils import net
26from utils import tools
27tools.force_local_third_party()
28
29# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
33import six
34from six.moves import queue as Queue
35
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
41from utils import file_path
42from utils import fs
43from utils import logging_utils
44from utils import net
45from utils import on_error
46from utils import subprocess42
47from utils import threading_utils
48
49
50__version__ = '0.9.0'
51
52# Version of isolate protocol passed to the server in /handshake request.
53ISOLATE_PROTOCOL_VERSION = '1.0'
54
55
56# Maximum expected delay (in seconds) between successive file fetches or uploads
57# in Storage. If it takes longer than that, a deadlock might be happening
58# and all stack frames for all threads are dumped to log.
59DEADLOCK_TIMEOUT = 5 * 60
60
61
62# The number of files to check the isolate server per /pre-upload query.
63# All files are sorted by likelihood of a change in the file content
64# (currently file size is used to estimate this: larger the file -> larger the
65# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
66# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
67# and so on. Numbers here is a trade-off; the more per request, the lower the
68# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
69# larger values cause longer lookups, increasing the initial latency to start
70# uploading, which is especially an issue for large files. This value is
71# optimized for the "few thousands files to look up with minimal number of large
72# files missing" case.
73ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
74
75
76# A list of already compressed extension types that should not receive any
77# compression before being uploaded.
78ALREADY_COMPRESSED_TYPES = [
79    '7z',
80    'avi',
81    'cur',
82    'gif',
83    'h264',
84    'jar',
85    'jpeg',
86    'jpg',
87    'mp4',
88    'pdf',
89    'png',
90    'wav',
91    'zip',
92]
93
94# The delay (in seconds) to wait between logging statements when retrieving the
95# required files. This is intended to let the user know that the program is
96# still running.
97DELAY_BETWEEN_UPDATES_IN_SECS = 30
98
99
100DEFAULT_DENYLIST = (
101    # Temporary vim or python files.
102    r'^.+\.(?:pyc|swp)$',
103    # .git or .svn directory.
104    r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
105)
106
107
108class Error(Exception):
109  """Generic runtime error."""
110  pass
111
112
113class Aborted(Error):
114  """Operation aborted."""
115  pass
116
117
118class AlreadyExists(Error):
119  """File already exists."""
120
121
122def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
123  """Yields file content in chunks of |chunk_size| starting from |offset|."""
124  with fs.open(path, 'rb') as f:
125    if offset:
126      f.seek(offset)
127    while True:
128      data = f.read(chunk_size)
129      if not data:
130        break
131      yield data
132
133
134def fileobj_path(fileobj):
135  """Return file system path for file like object or None.
136
137  The returned path is guaranteed to exist and can be passed to file system
138  operations like copy.
139  """
140  name = getattr(fileobj, 'name', None)
141  if name is None:
142    return None
143
144  # If the file like object was created using something like open("test.txt")
145  # name will end up being a str (such as a function outside our control, like
146  # the standard library). We want all our paths to be unicode objects, so we
147  # decode it.
148  if not isinstance(name, six.text_type):
149    # We incorrectly assume that UTF-8 is used everywhere.
150    name = name.decode('utf-8')
151
152  # fs.exists requires an absolute path, otherwise it will fail with an
153  # assertion error.
154  if not os.path.isabs(name):
155    return None
156
157  if fs.exists(name):
158    return name
159  return None
160
161
162# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
163# wrappers have been created.
164def fileobj_copy(
165    dstfileobj, srcfileobj, size=-1,
166    chunk_size=isolated_format.DISK_FILE_CHUNK):
167  """Copy data from srcfileobj to dstfileobj.
168
169  Providing size means exactly that amount of data will be copied (if there
170  isn't enough data, an IOError exception is thrown). Otherwise all data until
171  the EOF marker will be copied.
172  """
173  if size == -1 and hasattr(srcfileobj, 'tell'):
174    if srcfileobj.tell() != 0:
175      raise IOError('partial file but not using size')
176
177  written = 0
178  while written != size:
179    readsize = chunk_size
180    if size > 0:
181      readsize = min(readsize, size-written)
182    data = srcfileobj.read(readsize)
183    if not data:
184      if size == -1:
185        break
186      raise IOError('partial file, got %s, wanted %s' % (written, size))
187    dstfileobj.write(data)
188    written += len(data)
189
190
191def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
192  """Put srcfileobj at the given dstpath with given mode.
193
194  The function aims to do this as efficiently as possible while still allowing
195  any possible file like object be given.
196
197  Creating a tree of hardlinks has a few drawbacks:
198  - tmpfs cannot be used for the scratch space. The tree has to be on the same
199    partition as the cache.
200  - involves a write to the inode, which advances ctime, cause a metadata
201    writeback (causing disk seeking).
202  - cache ctime cannot be used to detect modifications / corruption.
203  - Some file systems (NTFS) have a 64k limit on the number of hardlink per
204    partition. This is why the function automatically fallbacks to copying the
205    file content.
206  - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
207    same owner is for all hardlinks.
208  - Anecdotal report that ext2 is known to be potentially faulty on high rate
209    of hardlink creation.
210
211  Creating a tree of symlinks has a few drawbacks:
212  - Tasks running the equivalent of os.path.realpath() will get the naked path
213    and may fail.
214  - Windows:
215    - Symlinks are reparse points:
216      https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
217      https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
218    - Symbolic links are Win32 paths, not NT paths.
219      https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
220    - Symbolic links are supported on Windows 7 and later only.
221    - SeCreateSymbolicLinkPrivilege is needed, which is not present by
222      default.
223    - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
224      RID is present in the token;
225      https://msdn.microsoft.com/en-us/library/bb530410.aspx
226  """
227  srcpath = fileobj_path(srcfileobj)
228  if srcpath and size == -1:
229    readonly = file_mode is None or (
230        file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
231
232    if readonly:
233      # If the file is read only we can link the file
234      if use_symlink:
235        link_mode = file_path.SYMLINK_WITH_FALLBACK
236      else:
237        link_mode = file_path.HARDLINK_WITH_FALLBACK
238    else:
239      # If not read only, we must copy the file
240      link_mode = file_path.COPY
241
242    file_path.link_file(dstpath, srcpath, link_mode)
243    assert fs.exists(dstpath)
244  else:
245    # Need to write out the file
246    with fs.open(dstpath, 'wb') as dstfileobj:
247      fileobj_copy(dstfileobj, srcfileobj, size)
248
249    if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
250      # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
251      # early to skip slow/unnecessary chmod call.
252      return
253
254  # file_mode of 0 is actually valid, so need explicit check.
255  if file_mode is not None:
256    fs.chmod(dstpath, file_mode)
257
258
259def zip_compress(content_generator, level=7):
260  """Reads chunks from |content_generator| and yields zip compressed chunks."""
261  compressor = zlib.compressobj(level)
262  for chunk in content_generator:
263    compressed = compressor.compress(chunk)
264    if compressed:
265      yield compressed
266  tail = compressor.flush(zlib.Z_FINISH)
267  if tail:
268    yield tail
269
270
271def zip_decompress(
272    content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
273  """Reads zipped data from |content_generator| and yields decompressed data.
274
275  Decompresses data in small chunks (no larger than |chunk_size|) so that
276  zip bomb file doesn't cause zlib to preallocate huge amount of memory.
277
278  Raises IOError if data is corrupted or incomplete.
279  """
280  decompressor = zlib.decompressobj()
281  compressed_size = 0
282  try:
283    for chunk in content_generator:
284      compressed_size += len(chunk)
285      data = decompressor.decompress(chunk, chunk_size)
286      if data:
287        yield data
288      while decompressor.unconsumed_tail:
289        data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
290        if data:
291          yield data
292    tail = decompressor.flush()
293    if tail:
294      yield tail
295  except zlib.error as e:
296    raise IOError(
297        'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
298  # Ensure all data was read and decompressed.
299  if decompressor.unused_data or decompressor.unconsumed_tail:
300    raise IOError('Not all data was decompressed')
301
302
303def _get_zip_compression_level(filename):
304  """Given a filename calculates the ideal zip compression level to use."""
305  file_ext = os.path.splitext(filename)[1].lower()
306  # TODO(csharp): Profile to find what compression level works best.
307  return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
308
309
310def create_directories(base_directory, files):
311  """Creates the directory structure needed by the given list of files."""
312  logging.debug('create_directories(%s, %d)', base_directory, len(files))
313  # Creates the tree of directories to create.
314  directories = set(os.path.dirname(f) for f in files)
315  for item in list(directories):
316    while item:
317      directories.add(item)
318      item = os.path.dirname(item)
319  for d in sorted(directories):
320    if d:
321      abs_d = os.path.join(base_directory, d)
322      if not fs.isdir(abs_d):
323        fs.mkdir(abs_d)
324
325
326def _create_symlinks(base_directory, files):
327  """Creates any symlinks needed by the given set of files."""
328  for filepath, properties in files:
329    if 'l' not in properties:
330      continue
331    if sys.platform == 'win32':
332      # TODO(maruel): Create symlink via the win32 api.
333      logging.warning('Ignoring symlink %s', filepath)
334      continue
335    outfile = os.path.join(base_directory, filepath)
336    try:
337      os.symlink(properties['l'], outfile)  # pylint: disable=E1101
338    except OSError as e:
339      if e.errno == errno.EEXIST:
340        raise AlreadyExists('File %s already exists.' % outfile)
341      raise
342
343
344class _ThreadFile(object):
345  """Multithreaded fake file. Used by TarBundle."""
346  def __init__(self):
347    self._data = threading_utils.TaskChannel()
348    self._offset = 0
349
350  def __iter__(self):
351    return self._data
352
353  def tell(self):
354    return self._offset
355
356  def write(self, b):
357    self._data.send_result(b)
358    self._offset += len(b)
359
360  def close(self):
361    self._data.send_done()
362
363
364class FileItem(isolate_storage.Item):
365  """A file to push to Storage.
366
367  Its digest and size may be provided in advance, if known. Otherwise they will
368  be derived from the file content.
369  """
370
371  def __init__(self, path, algo, digest=None, size=None, high_priority=False):
372    super(FileItem, self).__init__(
373        digest,
374        size if size is not None else fs.stat(path).st_size,
375        high_priority,
376        compression_level=_get_zip_compression_level(path))
377    self._path = path
378    self._algo = algo
379    self._meta = None
380
381  @property
382  def path(self):
383    return self._path
384
385  @property
386  def algo(self):
387    return self._algo
388
389  @property
390  def digest(self):
391    if not self._digest:
392      self._digest = isolated_format.hash_file(self._path, self._algo)
393    return self._digest
394
395  @property
396  def meta(self):
397    if not self._meta:
398      # TODO(maruel): Inline.
399      self._meta = isolated_format.file_to_metadata(self.path, False)
400      # We need to hash right away.
401      self._meta['h'] = self.digest
402    return self._meta
403
404  def content(self):
405    return file_read(self.path)
406
407
408class TarBundle(isolate_storage.Item):
409  """Tarfile to push to Storage.
410
411  Its digest is the digest of all the files it contains. It is generated on the
412  fly.
413  """
414
415  def __init__(self, root, algo):
416    # 2 trailing 512 bytes headers.
417    super(TarBundle, self).__init__(size=1024)
418    self._items = []
419    self._meta = None
420    self._algo = algo
421    self._root_len = len(root) + 1
422    # Same value as for Go.
423    # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
424    # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
425    self._archive_max_size = int(10e6)
426
427  @property
428  def digest(self):
429    if not self._digest:
430      self._prepare()
431    return self._digest
432
433  @property
434  def size(self):
435    if self._size is None:
436      self._prepare()
437    return self._size
438
439  def try_add(self, item):
440    """Try to add this file to the bundle.
441
442    It is extremely naive but this should be just enough for
443    https://crbug.com/825418.
444
445    Future improvements should be in the Go code, and the Swarming bot should be
446    migrated to use the Go code instead.
447    """
448    if not item.size:
449      return False
450    # pylint: disable=unreachable
451    rounded = (item.size + 512) & ~511
452    if rounded + self._size > self._archive_max_size:
453      return False
454    # https://crbug.com/825418
455    return False
456    self._size += rounded
457    self._items.append(item)
458    return True
459
460  def yield_item_path_meta(self):
461    """Returns a tuple(Item, filepath, meta_dict).
462
463    If the bundle contains less than 5 items, the items are yielded.
464    """
465    if len(self._items) < 5:
466      # The tarball is too small, yield individual items, if any.
467      for item in self._items:
468        yield item, item.path[self._root_len:], item.meta
469    else:
470      # This ensures self._meta is set.
471      p = self.digest + '.tar'
472      # Yield itself as a tarball.
473      yield self, p, self._meta
474
475  def content(self):
476    """Generates the tarfile content on the fly."""
477    obj = _ThreadFile()
478    def _tar_thread():
479      try:
480        t = tarfile.open(
481            fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
482        for item in self._items:
483          logging.info(' tarring %s', item.path)
484          t.add(item.path)
485        t.close()
486      except Exception:
487        logging.exception('Internal failure')
488      finally:
489        obj.close()
490
491    t = threading.Thread(target=_tar_thread)
492    t.start()
493    try:
494      for data in obj:
495        yield data
496    finally:
497      t.join()
498
499  def _prepare(self):
500    h = self._algo()
501    total = 0
502    for chunk in self.content():
503      h.update(chunk)
504      total += len(chunk)
505    # pylint: disable=attribute-defined-outside-init
506    # This is not true, they are defined in Item.__init__().
507    self._digest = h.hexdigest()
508    self._size = total
509    self._meta = {
510      'h': self.digest,
511      's': self.size,
512      't': u'tar',
513    }
514
515
516class BufferItem(isolate_storage.Item):
517  """A byte buffer to push to Storage."""
518
519  def __init__(self, buf, algo, high_priority=False):
520    super(BufferItem, self).__init__(
521        digest=algo(buf).hexdigest(),
522        size=len(buf),
523        high_priority=high_priority)
524    self._buffer = buf
525
526  def content(self):
527    return [self._buffer]
528
529
530class Storage(object):
531  """Efficiently downloads or uploads large set of files via StorageApi.
532
533  Implements compression support, parallel 'contains' checks, parallel uploads
534  and more.
535
536  Works only within single namespace (and thus hashing algorithm and compression
537  scheme are fixed).
538
539  Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
540  signal handlers table to handle Ctrl+C.
541  """
542
543  def __init__(self, storage_api):
544    self._storage_api = storage_api
545    self._cpu_thread_pool = None
546    self._net_thread_pool = None
547    self._aborted = False
548    self._prev_sig_handlers = {}
549
550  @property
551  def server_ref(self):
552    """Shortcut to get the server_ref from storage_api.
553
554    This can be used to get the underlying hash_algo.
555    """
556    return self._storage_api.server_ref
557
558  @property
559  def cpu_thread_pool(self):
560    """ThreadPool for CPU-bound tasks like zipping."""
561    if self._cpu_thread_pool is None:
562      threads = max(threading_utils.num_processors(), 2)
563      max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
564      if sys.maxsize <= max_size:
565        # On 32 bits userland, do not try to use more than 16 threads.
566        threads = min(threads, 16)
567      self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
568    return self._cpu_thread_pool
569
570  @property
571  def net_thread_pool(self):
572    """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
573    if self._net_thread_pool is None:
574      self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
575    return self._net_thread_pool
576
577  def close(self):
578    """Waits for all pending tasks to finish."""
579    logging.info('Waiting for all threads to die...')
580    if self._cpu_thread_pool:
581      self._cpu_thread_pool.join()
582      self._cpu_thread_pool.close()
583      self._cpu_thread_pool = None
584    if self._net_thread_pool:
585      self._net_thread_pool.join()
586      self._net_thread_pool.close()
587      self._net_thread_pool = None
588    logging.info('Done.')
589
590  def abort(self):
591    """Cancels any pending or future operations."""
592    # This is not strictly theadsafe, but in the worst case the logging message
593    # will be printed twice. Not a big deal. In other places it is assumed that
594    # unprotected reads and writes to _aborted are serializable (it is true
595    # for python) and thus no locking is used.
596    if not self._aborted:
597      logging.warning('Aborting... It can take a while.')
598      self._aborted = True
599
600  def __enter__(self):
601    """Context manager interface."""
602    assert not self._prev_sig_handlers, self._prev_sig_handlers
603    for s in (signal.SIGINT, signal.SIGTERM):
604      self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
605    return self
606
607  def __exit__(self, _exc_type, _exc_value, _traceback):
608    """Context manager interface."""
609    self.close()
610    while self._prev_sig_handlers:
611      s, h = self._prev_sig_handlers.popitem()
612      signal.signal(s, h)
613    return False
614
615  def upload_items(self, items, verify_push=False):
616    """Uploads a generator of Item to the isolate server.
617
618    It figures out what items are missing from the server and uploads only them.
619
620    It uses 3 threads internally:
621    - One to create batches based on a timeout
622    - One to dispatch the /contains RPC and field the missing entries
623    - One to field the /push RPC
624
625    The main threads enumerates 'items' and pushes to the first thread. Then it
626    join() all the threads, waiting for them to complete.
627
628        (enumerate items of Item, this can be slow as disk is traversed)
629              |
630              v
631    _create_items_batches_thread       Thread #1
632        (generates list(Item), every 3s or 20~100 items)
633              |
634              v
635      _do_lookups_thread               Thread #2
636         |         |
637         v         v
638     (missing)  (was on server)
639         |
640         v
641    _handle_missing_thread             Thread #3
642          |
643          v
644      (upload Item, append to uploaded)
645
646    Arguments:
647      items: list of isolate_storage.Item instances that represents data to
648             upload.
649      verify_push: verify files are uploaded correctly by fetching from server.
650
651    Returns:
652      List of items that were uploaded. All other items are already there.
653
654    Raises:
655      The first exception being raised in the worker threads.
656    """
657    incoming = Queue.Queue()
658    batches_to_lookup = Queue.Queue()
659    missing = Queue.Queue()
660    uploaded = []
661    exc_channel = threading_utils.TaskChannel()
662
663    def _create_items_batches_thread():
664      """Creates batches for /contains RPC lookup from individual items.
665
666      Input: incoming
667      Output: batches_to_lookup
668      """
669      try:
670        batch_size_index = 0
671        batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
672        batch = []
673        while not self._aborted:
674          try:
675            item = incoming.get(True, timeout=3)
676            if item:
677              batch.append(item)
678          except Queue.Empty:
679            item = False
680          if len(batch) == batch_size or (not item and batch):
681            if len(batch) == batch_size:
682              batch_size_index += 1
683              batch_size = ITEMS_PER_CONTAINS_QUERIES[
684                  min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
685            batches_to_lookup.put(batch)
686            batch = []
687          if item is None:
688            break
689      except Exception:
690        exc_channel.send_exception()
691      finally:
692        # Unblock the next pipeline.
693        batches_to_lookup.put(None)
694
695    def _do_lookups_thread():
696      """Enqueues all the /contains RPCs and emits the missing items.
697
698      Input: batches_to_lookup
699      Output: missing, to_upload
700      """
701      try:
702        channel = threading_utils.TaskChannel()
703        def _contains(b):
704          if self._aborted:
705            raise Aborted()
706          return self._storage_api.contains(b)
707
708        pending_contains = 0
709        while not self._aborted:
710          batch = batches_to_lookup.get()
711          if batch is None:
712            break
713          self.net_thread_pool.add_task_with_channel(
714              channel, threading_utils.PRIORITY_HIGH, _contains, batch)
715          pending_contains += 1
716          while pending_contains and not self._aborted:
717            try:
718              v = channel.next(timeout=0)
719            except threading_utils.TaskChannel.Timeout:
720              break
721            pending_contains -= 1
722            for missing_item, push_state in v.items():
723              missing.put((missing_item, push_state))
724        while pending_contains and not self._aborted:
725          for missing_item, push_state in channel.next().items():
726            missing.put((missing_item, push_state))
727          pending_contains -= 1
728      except Exception:
729        exc_channel.send_exception()
730      finally:
731        # Unblock the next pipeline.
732        missing.put((None, None))
733
734    def _handle_missing_thread():
735      """Sends the missing items to the uploader.
736
737      Input: missing
738      Output: uploaded
739      """
740      try:
741        with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
742          channel = threading_utils.TaskChannel()
743          pending_upload = 0
744          while not self._aborted:
745            try:
746              missing_item, push_state = missing.get(True, timeout=5)
747              if missing_item is None:
748                break
749              self._async_push(channel, missing_item, push_state, verify_push)
750              pending_upload += 1
751            except Queue.Empty:
752              pass
753            detector.ping()
754            while not self._aborted and pending_upload:
755              try:
756                item = channel.next(timeout=0)
757              except threading_utils.TaskChannel.Timeout:
758                break
759              uploaded.append(item)
760              pending_upload -= 1
761              logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
762                            pending_upload, item.digest, item.size)
763          while not self._aborted and pending_upload:
764            item = channel.next()
765            uploaded.append(item)
766            pending_upload -= 1
767            logging.debug(
768                'Uploaded %d; %d pending: %s (%d)',
769                len(uploaded), pending_upload, item.digest, item.size)
770      except Exception:
771        exc_channel.send_exception()
772
773    threads = [
774        threading.Thread(target=_create_items_batches_thread),
775        threading.Thread(target=_do_lookups_thread),
776        threading.Thread(target=_handle_missing_thread),
777    ]
778    for t in threads:
779      t.start()
780
781    try:
782      # For each digest keep only first isolate_storage.Item that matches it.
783      # All other items are just indistinguishable copies from the point of view
784      # of isolate server (it doesn't care about paths at all, only content and
785      # digests).
786      seen = {}
787      try:
788        # TODO(maruel): Reorder the items as a priority queue, with larger items
789        # being processed first. This is, before hashing the data.
790        # This must be done in the primary thread since items can be a
791        # generator.
792        for item in items:
793          if seen.setdefault(item.digest, item) is item:
794            incoming.put(item)
795      finally:
796        incoming.put(None)
797    finally:
798      for t in threads:
799        t.join()
800      exc_channel.send_done()
801      for _ in exc_channel:
802        # If there is no exception, this loop does nothing. Otherwise, it raises
803        # the first exception put onto |exc_channel|.
804        pass
805
806    logging.info('All %s files are uploaded', len(uploaded))
807    if seen:
808      _print_upload_stats(seen.values(), uploaded)
809    return uploaded
810
811  def _async_push(self, channel, item, push_state, verify_push=False):
812    """Starts asynchronous push to the server in a parallel thread.
813
814    Can be used only after |item| was checked for presence on a server with a
815    /contains RPC.
816
817    Arguments:
818      channel: TaskChannel that receives back |item| when upload ends.
819      item: item to upload as instance of isolate_storage.Item class.
820      push_state: push state returned by storage_api.contains(). It contains
821          storage specific information describing how to upload the item (for
822          example in case of cloud storage, it is signed upload URLs).
823      verify_push: verify files are uploaded correctly by fetching from server.
824
825    Returns:
826      None, but |channel| later receives back |item| when upload ends.
827    """
828    # Thread pool task priority.
829    priority = (
830        threading_utils.PRIORITY_HIGH if item.high_priority
831        else threading_utils.PRIORITY_MED)
832
833    def _push(content):
834      """Pushes an isolate_storage.Item and returns it to |channel|."""
835      if self._aborted:
836        raise Aborted()
837      self._storage_api.push(item, push_state, content)
838      if verify_push:
839        try:
840          self._fetch(
841              item.digest,
842              item.size,
843              # this consumes all elements from given generator.
844              lambda gen: collections.deque(gen, maxlen=0))
845        except Exception:
846          # reset push_state if failed to verify.
847          push_state.finalized = False
848          push_state.uploaded = False
849          raise
850
851      return item
852
853    # If zipping is not required, just start a push task. Don't pass 'content'
854    # so that it can create a new generator when it retries on failures.
855    if not self.server_ref.is_with_compression:
856      self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
857      return
858
859    # If zipping is enabled, zip in a separate thread.
860    def zip_and_push():
861      # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
862      # content right here. It will block until all file is zipped.
863      try:
864        if self._aborted:
865          raise Aborted()
866        stream = zip_compress(item.content(), item.compression_level)
867        # In Python3, zlib.compress returns a byte object instead of str.
868        data = six.b('').join(stream)
869      except Exception as exc:
870        logging.error('Failed to zip \'%s\': %s', item, exc)
871        channel.send_exception()
872        return
873      # Pass '[data]' explicitly because the compressed data is not same as the
874      # one provided by 'item'. Since '[data]' is a list, it can safely be
875      # reused during retries.
876      self.net_thread_pool.add_task_with_channel(
877          channel, priority, _push, [data])
878    self.cpu_thread_pool.add_task(priority, zip_and_push)
879
880  def push(self, item, push_state):
881    """Synchronously pushes a single item to the server.
882
883    If you need to push many items at once, consider using 'upload_items' or
884    '_async_push' with instance of TaskChannel.
885
886    Arguments:
887      item: item to upload as instance of isolate_storage.Item class.
888      push_state: push state returned by storage_api.contains(). It contains
889          storage specific information describing how to upload the item (for
890          example in case of cloud storage, it is signed upload URLs).
891
892    Returns:
893      Pushed item (same object as |item|).
894    """
895    channel = threading_utils.TaskChannel()
896    with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
897      self._async_push(channel, item, push_state)
898      pushed = channel.next()
899      assert pushed is item
900    return item
901
902  def _fetch(self, digest, size, sink):
903    try:
904      # Prepare reading pipeline.
905      stream = self._storage_api.fetch(digest, size, 0)
906      if self.server_ref.is_with_compression:
907        stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
908      # Run |stream| through verifier that will assert its size.
909      verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
910                                     size)
911      # Verified stream goes to |sink|.
912      sink(verifier.run())
913    except Exception:
914      logging.exception('Failed to fetch %s', digest)
915      raise
916
917  def async_fetch(self, channel, priority, digest, size, sink):
918    """Starts asynchronous fetch from the server in a parallel thread.
919
920    Arguments:
921      channel: TaskChannel that receives back |digest| when download ends.
922      priority: thread pool task priority for the fetch.
923      digest: hex digest of an item to download.
924      size: expected size of the item (after decompression).
925      sink: function that will be called as sink(generator).
926    """
927    def fetch():
928      self._fetch(digest, size, sink)
929      return digest
930
931    # Don't bother with zip_thread_pool for decompression. Decompression is
932    # really fast and most probably IO bound anyway.
933    self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
934
935
936class FetchQueue(object):
937  """Fetches items from Storage and places them into ContentAddressedCache.
938
939  It manages multiple concurrent fetch operations. Acts as a bridge between
940  Storage and ContentAddressedCache so that Storage and ContentAddressedCache
941  don't depend on each other at all.
942  """
943
944  def __init__(self, storage, cache):
945    self.storage = storage
946    self.cache = cache
947    self._channel = threading_utils.TaskChannel()
948    self._pending = set()
949    self._accessed = set()
950    self._fetched = set(cache)
951    # Pending digests that the caller waits for, see wait_on()/wait().
952    self._waiting_on = set()
953    # Already fetched digests the caller waits for which are not yet returned by
954    # wait().
955    self._waiting_on_ready = set()
956
957  def add(
958      self,
959      digest,
960      size=local_caching.UNKNOWN_FILE_SIZE,
961      priority=threading_utils.PRIORITY_MED):
962    """Starts asynchronous fetch of item |digest|."""
963    # Fetching it now?
964    if digest in self._pending:
965      return
966
967    # Mark this file as in use, verify_all_cached will later ensure it is still
968    # in cache.
969    self._accessed.add(digest)
970
971    # Already fetched? Notify cache to update item's LRU position.
972    if digest in self._fetched:
973      # 'touch' returns True if item is in cache and not corrupted.
974      if self.cache.touch(digest, size):
975        return
976      logging.error('%s is corrupted', digest)
977      self._fetched.remove(digest)
978
979    # TODO(maruel): It should look at the free disk space, the current cache
980    # size and the size of the new item on every new item:
981    # - Trim the cache as more entries are listed when free disk space is low,
982    #   otherwise if the amount of data downloaded during the run > free disk
983    #   space, it'll crash.
984    # - Make sure there's enough free disk space to fit all dependencies of
985    #   this run! If not, abort early.
986
987    # Start fetching.
988    self._pending.add(digest)
989    self.storage.async_fetch(
990        self._channel, priority, digest, size,
991        functools.partial(self.cache.write, digest))
992
993  def wait_on(self, digest):
994    """Updates digests to be waited on by 'wait'."""
995    # Calculate once the already fetched items. These will be retrieved first.
996    if digest in self._fetched:
997      self._waiting_on_ready.add(digest)
998    else:
999      self._waiting_on.add(digest)
1000
1001  def wait(self):
1002    """Waits until any of waited-on items is retrieved.
1003
1004    Once this happens, it is remove from the waited-on set and returned.
1005
1006    This function is called in two waves. The first wave it is done for HIGH
1007    priority items, the isolated files themselves. The second wave it is called
1008    for all the files.
1009
1010    If the waited-on set is empty, raises RuntimeError.
1011    """
1012    # Flush any already fetched items.
1013    if self._waiting_on_ready:
1014      return self._waiting_on_ready.pop()
1015
1016    assert self._waiting_on, 'Needs items to wait on'
1017
1018    # Wait for one waited-on item to be fetched.
1019    while self._pending:
1020      digest = self._channel.next()
1021      self._pending.remove(digest)
1022      self._fetched.add(digest)
1023      if digest in self._waiting_on:
1024        self._waiting_on.remove(digest)
1025        return digest
1026
1027    # Should never reach this point due to assert above.
1028    raise RuntimeError('Impossible state')
1029
1030  @property
1031  def wait_queue_empty(self):
1032    """Returns True if there is no digest left for wait() to return."""
1033    return not self._waiting_on and not self._waiting_on_ready
1034
1035  def inject_local_file(self, path, algo):
1036    """Adds local file to the cache as if it was fetched from storage."""
1037    with fs.open(path, 'rb') as f:
1038      data = f.read()
1039    digest = algo(data).hexdigest()
1040    self.cache.write(digest, [data])
1041    self._fetched.add(digest)
1042    return digest
1043
1044  @property
1045  def pending_count(self):
1046    """Returns number of items to be fetched."""
1047    return len(self._pending)
1048
1049  def verify_all_cached(self):
1050    """True if all accessed items are in cache."""
1051    # Not thread safe, but called after all work is done.
1052    return self._accessed.issubset(self.cache)
1053
1054
1055class FetchStreamVerifier(object):
1056  """Verifies that fetched file is valid before passing it to the
1057  ContentAddressedCache.
1058  """
1059
1060  def __init__(self, stream, hasher, expected_digest, expected_size):
1061    """Initializes the verifier.
1062
1063    Arguments:
1064    * stream: an iterable yielding chunks of content
1065    * hasher: an object from hashlib that supports update() and hexdigest()
1066      (eg, hashlib.sha1).
1067    * expected_digest: if the entire stream is piped through hasher and then
1068      summarized via hexdigest(), this should be the result. That is, it
1069      should be a hex string like 'abc123'.
1070    * expected_size: either the expected size of the stream, or
1071      local_caching.UNKNOWN_FILE_SIZE.
1072    """
1073    assert stream is not None
1074    self.stream = stream
1075    self.expected_digest = expected_digest
1076    self.expected_size = expected_size
1077    self.current_size = 0
1078    self.rolling_hash = hasher()
1079
1080  def run(self):
1081    """Generator that yields same items as |stream|.
1082
1083    Verifies |stream| is complete before yielding a last chunk to consumer.
1084
1085    Also wraps IOError produced by consumer into MappingError exceptions since
1086    otherwise Storage will retry fetch on unrelated local cache errors.
1087    """
1088    # Read one chunk ahead, keep it in |stored|.
1089    # That way a complete stream can be verified before pushing last chunk
1090    # to consumer.
1091    stored = None
1092    for chunk in self.stream:
1093      assert chunk is not None
1094      if stored is not None:
1095        self._inspect_chunk(stored, is_last=False)
1096        try:
1097          yield stored
1098        except IOError as exc:
1099          raise isolated_format.MappingError(
1100              'Failed to store an item in cache: %s' % exc)
1101      stored = chunk
1102    if stored is not None:
1103      self._inspect_chunk(stored, is_last=True)
1104      try:
1105        yield stored
1106      except IOError as exc:
1107        raise isolated_format.MappingError(
1108            'Failed to store an item in cache: %s' % exc)
1109
1110  def _inspect_chunk(self, chunk, is_last):
1111    """Called for each fetched chunk before passing it to consumer."""
1112    self.current_size += len(chunk)
1113    self.rolling_hash.update(chunk)
1114    if not is_last:
1115      return
1116
1117    if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
1118        (self.expected_size != self.current_size)):
1119      msg = 'Incorrect file size: want %d, got %d' % (
1120          self.expected_size, self.current_size)
1121      raise IOError(msg)
1122
1123    actual_digest = self.rolling_hash.hexdigest()
1124    if self.expected_digest != actual_digest:
1125      msg = 'Incorrect digest: want %s, got %s' % (
1126          self.expected_digest, actual_digest)
1127      raise IOError(msg)
1128
1129
1130class IsolatedBundle(object):
1131  """Fetched and parsed .isolated file with all dependencies."""
1132
1133  def __init__(self, filter_cb):
1134    """
1135    filter_cb: callback function to filter downloaded content.
1136               When filter_cb is not None, Isolated file is downloaded iff
1137               filter_cb(filepath) returns True.
1138    """
1139
1140    self.command = []
1141    self.files = {}
1142    self.relative_cwd = None
1143    # The main .isolated file, a IsolatedFile instance.
1144    self.root = None
1145
1146    self._filter_cb = filter_cb
1147
1148  def fetch(self, fetch_queue, root_isolated_hash, algo):
1149    """Fetches the .isolated and all the included .isolated.
1150
1151    It enables support for "included" .isolated files. They are processed in
1152    strict order but fetched asynchronously from the cache. This is important so
1153    that a file in an included .isolated file that is overridden by an embedding
1154    .isolated file is not fetched needlessly. The includes are fetched in one
1155    pass and the files are fetched as soon as all the ones on the left-side
1156    of the tree were fetched.
1157
1158    The prioritization is very important here for nested .isolated files.
1159    'includes' have the highest priority and the algorithm is optimized for both
1160    deep and wide trees. A deep one is a long link of .isolated files referenced
1161    one at a time by one item in 'includes'. A wide one has a large number of
1162    'includes' in a single .isolated file. 'left' is defined as an included
1163    .isolated file earlier in the 'includes' list. So the order of the elements
1164    in 'includes' is important.
1165
1166    As a side effect this method starts asynchronous fetch of all data files
1167    by adding them to |fetch_queue|. It doesn't wait for data files to finish
1168    fetching though.
1169    """
1170    self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1171
1172    # Isolated files being retrieved now: hash -> IsolatedFile instance.
1173    pending = {}
1174    # Set of hashes of already retrieved items to refuse recursive includes.
1175    seen = set()
1176    # Set of IsolatedFile's whose data files have already being fetched.
1177    processed = set()
1178
1179    def retrieve_async(isolated_file):
1180      """Retrieves an isolated file included by the root bundle."""
1181      h = isolated_file.obj_hash
1182      if h in seen:
1183        raise isolated_format.IsolatedError(
1184            'IsolatedFile %s is retrieved recursively' % h)
1185      assert h not in pending
1186      seen.add(h)
1187      pending[h] = isolated_file
1188      # This isolated item is being added dynamically, notify FetchQueue.
1189      fetch_queue.wait_on(h)
1190      fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1191
1192    # Start fetching root *.isolated file (single file, not the whole bundle).
1193    retrieve_async(self.root)
1194
1195    while pending:
1196      # Wait until some *.isolated file is fetched, parse it.
1197      item_hash = fetch_queue.wait()
1198      item = pending.pop(item_hash)
1199      with fetch_queue.cache.getfileobj(item_hash) as f:
1200        item.load(f.read())
1201
1202      # Start fetching included *.isolated files.
1203      for new_child in item.children:
1204        retrieve_async(new_child)
1205
1206      # Always fetch *.isolated files in traversal order, waiting if necessary
1207      # until next to-be-processed node loads. "Waiting" is done by yielding
1208      # back to the outer loop, that waits until some *.isolated is loaded.
1209      for node in isolated_format.walk_includes(self.root):
1210        if node not in processed:
1211          # Not visited, and not yet loaded -> wait for it to load.
1212          if not node.is_loaded:
1213            break
1214          # Not visited and loaded -> process it and continue the traversal.
1215          self._start_fetching_files(node, fetch_queue)
1216          processed.add(node)
1217
1218    # All *.isolated files should be processed by now and only them.
1219    all_isolateds = set(isolated_format.walk_includes(self.root))
1220    assert all_isolateds == processed, (all_isolateds, processed)
1221    assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1222
1223    # Extract 'command' and other bundle properties.
1224    for node in isolated_format.walk_includes(self.root):
1225      self._update_self(node)
1226    self.relative_cwd = self.relative_cwd or ''
1227
1228  def _start_fetching_files(self, isolated, fetch_queue):
1229    """Starts fetching files from |isolated| that are not yet being fetched.
1230
1231    Modifies self.files.
1232    """
1233    files = isolated.data.get('files', {})
1234    logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1235    for filepath, properties in files.items():
1236      if self._filter_cb and not self._filter_cb(filepath):
1237        continue
1238
1239      # Root isolated has priority on the files being mapped. In particular,
1240      # overridden files must not be fetched.
1241      if filepath not in self.files:
1242        self.files[filepath] = properties
1243
1244        # Preemptively request hashed files.
1245        if 'h' in properties:
1246          fetch_queue.add(
1247              properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1248
1249  def _update_self(self, node):
1250    """Extracts bundle global parameters from loaded *.isolated file.
1251
1252    Will be called with each loaded *.isolated file in order of traversal of
1253    isolated include graph (see isolated_format.walk_includes).
1254    """
1255    # Grabs properties.
1256    if not self.command and node.data.get('command'):
1257      # Ensure paths are correctly separated on windows.
1258      self.command = node.data['command']
1259      if self.command:
1260        self.command[0] = self.command[0].replace('/', os.path.sep)
1261    if (self.relative_cwd is None and
1262        node.data.get('relative_cwd') is not None):
1263      self.relative_cwd = node.data['relative_cwd']
1264
1265
1266def get_storage(server_ref):
1267  """Returns Storage class that can upload and download from |namespace|.
1268
1269  Arguments:
1270    server_ref: isolate_storage.ServerRef instance.
1271
1272  Returns:
1273    Instance of Storage.
1274  """
1275  # Handle the specific internal use case.
1276  assert (isinstance(server_ref, isolate_storage.ServerRef) or
1277          type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
1278  return Storage(isolate_storage.get_storage_api(server_ref))
1279
1280
1281def _map_file(dst, digest, props, cache, use_symlinks):
1282  """Put downloaded file to destination path. This function is used for multi
1283  threaded file putting.
1284  """
1285  with tools.Profiler("_map_file for %s" % dst):
1286    with cache.getfileobj(digest) as srcfileobj:
1287      filetype = props.get('t', 'basic')
1288
1289      if filetype == 'basic':
1290        # Ignore all bits apart from the user.
1291        file_mode = (props.get('m') or 0o500) & 0o700
1292        putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
1293
1294      elif filetype == 'tar':
1295        basedir = os.path.dirname(dst)
1296        with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1297          ensured_dirs = set()
1298          for ti in t:
1299            if not ti.isfile():
1300              logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1301                              ti.type)
1302              continue
1303            # Handle files created on Windows fetched on POSIX and the
1304            # reverse.
1305            other_sep = '/' if os.path.sep == '\\' else '\\'
1306            name = ti.name.replace(other_sep, os.path.sep)
1307            fp = os.path.normpath(os.path.join(basedir, name))
1308            if not fp.startswith(basedir):
1309              logging.error('Path(%r) is outside root directory', fp)
1310            ifd = t.extractfile(ti)
1311            fp_dir = os.path.dirname(fp)
1312            if fp_dir not in ensured_dirs:
1313              file_path.ensure_tree(fp_dir)
1314              ensured_dirs.add(fp_dir)
1315            file_mode = ti.mode & 0o700
1316            putfile(ifd, fp, file_mode, ti.size)
1317
1318      else:
1319        raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
1320
1321
1322def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1323                   filter_cb=None):
1324  """Aggressively downloads the .isolated file(s), then download all the files.
1325
1326  Arguments:
1327    isolated_hash: hash of the root *.isolated file.
1328    storage: Storage class that communicates with isolate storage.
1329    cache: ContentAddressedCache class that knows how to store and map files
1330           locally.
1331    outdir: Output directory to map file tree to.
1332    use_symlinks: Use symlinks instead of hardlinks when True.
1333    filter_cb: filter that works as allowlist for downloaded files.
1334
1335  Returns:
1336    IsolatedBundle object that holds details about loaded *.isolated file.
1337  """
1338  logging.debug(
1339      'fetch_isolated(%s, %s, %s, %s, %s)',
1340      isolated_hash, storage, cache, outdir, use_symlinks)
1341  # Hash algorithm to use, defined by namespace |storage| is using.
1342  algo = storage.server_ref.hash_algo
1343  fetch_queue = FetchQueue(storage, cache)
1344  bundle = IsolatedBundle(filter_cb)
1345
1346  with tools.Profiler('GetIsolateds'):
1347    # Optionally support local files by manually adding them to cache.
1348    if not isolated_format.is_valid_hash(isolated_hash, algo):
1349      logging.debug(
1350          '%s is not a valid hash, assuming a file '
1351          '(algo was %s, hash size was %d)', isolated_hash, algo(),
1352          algo().digest_size)
1353      path = six.text_type(os.path.abspath(isolated_hash))
1354      try:
1355        isolated_hash = fetch_queue.inject_local_file(path, algo)
1356      except IOError as e:
1357        raise isolated_format.MappingError(
1358            '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1359            'valid hash (error: %s)?' % (isolated_hash, e))
1360
1361    # Load all *.isolated and start loading rest of the files.
1362    bundle.fetch(fetch_queue, isolated_hash, algo)
1363
1364  with tools.Profiler('GetRest'):
1365    # Create file system hierarchy.
1366    file_path.ensure_tree(outdir)
1367    create_directories(outdir, bundle.files)
1368    _create_symlinks(outdir, bundle.files.items())
1369
1370    # Ensure working directory exists.
1371    cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1372    file_path.ensure_tree(cwd)
1373
1374    # Multimap: digest -> list of pairs (path, props).
1375    remaining = {}
1376    for filepath, props in bundle.files.items():
1377      if 'h' in props:
1378        remaining.setdefault(props['h'], []).append((filepath, props))
1379        fetch_queue.wait_on(props['h'])
1380
1381    # Now block on the remaining files to be downloaded and mapped.
1382    logging.info('Retrieving remaining files (%d of them)...',
1383        fetch_queue.pending_count)
1384    last_update = time.time()
1385
1386    with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1387      with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1388        while remaining:
1389          detector.ping()
1390
1391          # Wait for any item to finish fetching to cache.
1392          digest = fetch_queue.wait()
1393
1394          # Create the files in the destination using item in cache as the
1395          # source.
1396          for filepath, props in remaining.pop(digest):
1397            fullpath = os.path.join(outdir, filepath)
1398
1399            putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1400                                         _map_file, fullpath, digest, props,
1401                                         cache, use_symlinks)
1402
1403          # Report progress.
1404          duration = time.time() - last_update
1405          if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1406            msg = '%d files remaining...' % len(remaining)
1407            sys.stdout.write(msg + '\n')
1408            sys.stdout.flush()
1409            logging.info(msg)
1410            last_update = time.time()
1411      assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1412      putfile_thread_pool.join()
1413
1414  # Save the cache right away to not loose the state of the new objects.
1415  cache.save()
1416  # Cache could evict some items we just tried to fetch, it's a fatal error.
1417  if not fetch_queue.verify_all_cached():
1418    free_disk = file_path.get_free_space(cache.cache_dir)
1419    msg = (
1420        'Cache is too small to hold all requested files.\n'
1421        '  %s\n  cache=%dbytes, %d items; %sb free_space') % (
1422          cache.policies, cache.total_size, len(cache), free_disk)
1423    raise isolated_format.MappingError(msg)
1424  return bundle
1425
1426
1427def _directory_to_metadata(root, algo, denylist):
1428  """Yields every file and/or symlink found.
1429
1430  Yields:
1431    tuple(FileItem, relpath, metadata)
1432    For a symlink, FileItem is None.
1433  """
1434  # Current tar file bundle, if any.
1435  root = file_path.get_native_path_case(root)
1436  bundle = TarBundle(root, algo)
1437  for relpath, issymlink in isolated_format.expand_directory_and_symlink(
1438      root,
1439      u'.' + os.path.sep,
1440      denylist,
1441      follow_symlinks=(sys.platform != 'win32')):
1442
1443    filepath = os.path.join(root, relpath)
1444    if issymlink:
1445      # TODO(maruel): Do not call this.
1446      meta = isolated_format.file_to_metadata(filepath, False)
1447      yield None, relpath, meta
1448      continue
1449
1450    prio = relpath.endswith('.isolated')
1451    if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1452      # The file was added to the current pending tarball and won't be archived
1453      # individually.
1454      continue
1455
1456    # Flush and reset the bundle.
1457    for i, p, m in bundle.yield_item_path_meta():
1458      yield i, p, m
1459    bundle = TarBundle(root, algo)
1460
1461    # Yield the file individually.
1462    item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1463    yield item, relpath, item.meta
1464
1465  for i, p, m in bundle.yield_item_path_meta():
1466    yield i, p, m
1467
1468
1469def _print_upload_stats(items, missing):
1470  """Prints upload stats."""
1471  total = len(items)
1472  total_size = sum(f.size for f in items)
1473  logging.info(
1474      'Total:      %6d, %9.1fkiB', total, total_size / 1024.)
1475  cache_hit = set(items).difference(missing)
1476  cache_hit_size = sum(f.size for f in cache_hit)
1477  logging.info(
1478      'cache hit:  %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1479      len(cache_hit),
1480      cache_hit_size / 1024.,
1481      len(cache_hit) * 100. / total,
1482      cache_hit_size * 100. / total_size if total_size else 0)
1483  cache_miss = missing
1484  cache_miss_size = sum(f.size for f in cache_miss)
1485  logging.info('cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1486               len(cache_miss), cache_miss_size / 1024.,
1487               len(cache_miss) * 100. / total,
1488               cache_miss_size * 100. / total_size if total_size else 0)
1489
1490
1491def _enqueue_dir(dirpath, denylist, hash_algo, hash_algo_name):
1492  """Called by archive_files_to_storage for a directory.
1493
1494  Create an .isolated file.
1495
1496  Yields:
1497    FileItem for every file found, plus one for the .isolated file itself.
1498  """
1499  files = {}
1500  for item, relpath, meta in _directory_to_metadata(dirpath, hash_algo,
1501                                                    denylist):
1502    # item is None for a symlink.
1503    files[relpath] = meta
1504    if item:
1505      yield item
1506
1507  # TODO(maruel): If there' not file, don't yield an .isolated file.
1508  data = {
1509    'algo': hash_algo_name,
1510    'files': files,
1511    'version': isolated_format.ISOLATED_FILE_VERSION,
1512  }
1513  # Keep the file in memory. This is fine because .isolated files are relatively
1514  # small.
1515  yield BufferItem(
1516      tools.format_json(data, True).encode(),
1517      algo=hash_algo,
1518      high_priority=True)
1519
1520
1521def _archive_files_to_storage_internal(storage,
1522                                       files,
1523                                       denylist,
1524                                       verify_push=False):
1525  """Stores every entry into remote storage and returns stats.
1526
1527  Arguments:
1528    storage: a Storage object that communicates with the remote object store.
1529    files: iterable of files to upload. If a directory is specified (with a
1530          trailing slash), a .isolated file is created and its hash is returned.
1531          Duplicates are skipped.
1532    denylist: function that returns True if a file should be omitted.
1533    verify_push: verify files are uploaded correctly by fetching from server.
1534
1535  Returns:
1536    tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
1537    The first file in the first item is always the .isolated file.
1538
1539  Raises:
1540    Re-raises the exception in upload_items(), if there is any.
1541  """
1542  # Dict of path to hash.
1543  results = collections.OrderedDict()
1544  hash_algo = storage.server_ref.hash_algo
1545  hash_algo_name = storage.server_ref.hash_algo_name
1546  # Generator of FileItem to pass to upload_items() concurrent operation.
1547  channel = threading_utils.TaskChannel()
1548  exc_channel = threading_utils.TaskChannel()
1549  uploaded_digests = set()
1550
1551  def _upload_items():
1552    try:
1553      results = storage.upload_items(channel, verify_push)
1554      uploaded_digests.update(f.digest for f in results)
1555    except Exception:
1556      exc_channel.send_exception()
1557
1558  t = threading.Thread(target=_upload_items)
1559  t.start()
1560
1561  # Keep track locally of the items to determine cold and hot items.
1562  items_found = []
1563  try:
1564    for f in files:
1565      assert isinstance(f, six.text_type), repr(f)
1566      if f in results:
1567        # Duplicate
1568        continue
1569      try:
1570        filepath = os.path.abspath(f)
1571        if fs.isdir(filepath):
1572          # Uploading a whole directory.
1573          item = None
1574          for item in _enqueue_dir(filepath, denylist, hash_algo,
1575                                   hash_algo_name):
1576            channel.send_result(item)
1577            items_found.append(item)
1578            # The very last item will be the .isolated file.
1579          if not item:
1580            # There was no file in the directory.
1581            continue
1582        elif fs.isfile(filepath):
1583          item = FileItem(
1584              path=filepath,
1585              algo=hash_algo,
1586              size=None,
1587              high_priority=f.endswith('.isolated'))
1588          channel.send_result(item)
1589          items_found.append(item)
1590        else:
1591          raise Error('%s is neither a file or directory.' % f)
1592        results[f] = item.digest
1593      except OSError:
1594        raise Error('Failed to process %s.' % f)
1595  finally:
1596    # Stops the generator, so _upload_items() can exit.
1597    channel.send_done()
1598  t.join()
1599  exc_channel.send_done()
1600
1601  try:
1602    for _ in exc_channel:
1603      pass
1604  except Exception:
1605    # log items when failed to upload files.
1606    for item in items_found:
1607      if isinstance(item, FileItem):
1608        logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s',
1609                      item.path, item.digest,
1610                      isolated_format.hash_file(item.path, item.algo))
1611        continue
1612
1613      logging.error('Item digest:%s', item.digest)
1614
1615    raise
1616
1617  cold = []
1618  hot = []
1619  for i in items_found:
1620    # Note that multiple FileItem may have the same .digest.
1621    if i.digest in uploaded_digests:
1622      cold.append(i)
1623    else:
1624      hot.append(i)
1625  return results, cold, hot
1626
1627
1628# TODO(crbug.com/1073832):
1629# remove this if process leak in coverage build was fixed.
1630def archive_files_to_storage(storage, files, denylist, verify_push=False):
1631  """Calls _archive_files_to_storage_internal with retry.
1632
1633  Arguments:
1634    See Arguments section in _archive_files_to_storage_internal
1635
1636  Returns:
1637    See Returns section in _archive_files_to_storage_internal
1638
1639  Raises:
1640    Re-raises the exception in _archive_files_to_storage_internal if all retry
1641    failed.
1642  """
1643
1644  # Will do exponential backoff.
1645  # e.g. 10, 20, 40, 80
1646  backoff = 10
1647
1648  while True:
1649    try:
1650      return _archive_files_to_storage_internal(storage, files, denylist,
1651                                                verify_push)
1652    except Exception:
1653      if backoff > 100:
1654        raise
1655
1656      on_error.report('error before %d second backoff' % backoff)
1657
1658      logging.exception(
1659          'failed to run _archive_files_to_storage_internal,'
1660          ' will retry after %d seconds', backoff)
1661      time.sleep(backoff)
1662      backoff *= 2
1663
1664
1665@subcommand.usage('<file1..fileN> or - to read from stdin')
1666def CMDarchive(parser, args):
1667  """Archives data to the server.
1668
1669  If a directory is specified, a .isolated file is created the whole directory
1670  is uploaded. Then this .isolated file can be included in another one to run
1671  commands.
1672
1673  The commands output each file that was processed with its content hash. For
1674  directories, the .isolated generated for the directory is listed as the
1675  directory entry itself.
1676  """
1677  add_isolate_server_options(parser)
1678  add_archive_options(parser)
1679  options, files = parser.parse_args(args)
1680  process_isolate_server_options(parser, options, True)
1681  server_ref = isolate_storage.ServerRef(
1682      options.isolate_server, options.namespace)
1683  if files == ['-']:
1684    files = (l.rstrip('\n\r') for l in sys.stdin)
1685  if not files:
1686    parser.error('Nothing to upload')
1687  files = (six.ensure_text(f) for f in files)
1688  denylist = tools.gen_denylist(options.blacklist)
1689  try:
1690    with get_storage(server_ref) as storage:
1691      results, _cold, _hot = archive_files_to_storage(storage, files, denylist)
1692  except (Error, local_caching.NoMoreSpace) as e:
1693    parser.error(e.args[0])
1694  print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
1695  return 0
1696
1697
1698def CMDdownload(parser, args):
1699  """Download data from the server.
1700
1701  It can either download individual files or a complete tree from a .isolated
1702  file.
1703  """
1704  add_isolate_server_options(parser)
1705  parser.add_option(
1706      '-s', '--isolated', metavar='HASH',
1707      help='hash of an isolated file, .isolated file content is discarded, use '
1708           '--file if you need it')
1709  parser.add_option(
1710      '-f',
1711      '--file',
1712      metavar='HASH DEST',
1713      default=[],
1714      action='append',
1715      nargs=2,
1716      help='hash and destination of a file, can be used multiple times')
1717  parser.add_option(
1718      '-t',
1719      '--target',
1720      metavar='DIR',
1721      default='download',
1722      help='destination directory')
1723  parser.add_option(
1724      '--use-symlinks',
1725      action='store_true',
1726      help='Use symlinks instead of hardlinks')
1727  add_cache_options(parser)
1728  options, args = parser.parse_args(args)
1729  if args:
1730    parser.error('Unsupported arguments: %s' % args)
1731  if not file_path.enable_symlink():
1732    logging.warning('Symlink support is not enabled')
1733
1734  process_isolate_server_options(parser, options, True)
1735  if bool(options.isolated) == bool(options.file):
1736    parser.error('Use one of --isolated or --file, and only one.')
1737  if not options.cache and options.use_symlinks:
1738    parser.error('--use-symlinks require the use of a cache with --cache')
1739
1740  cache = process_cache_options(options, trim=True)
1741  cache.cleanup()
1742  options.target = six.text_type(os.path.abspath(options.target))
1743  if options.isolated:
1744    if (fs.isfile(options.target) or
1745        (fs.isdir(options.target) and fs.listdir(options.target))):
1746      parser.error(
1747          '--target \'%s\' exists, please use another target' % options.target)
1748  server_ref = isolate_storage.ServerRef(
1749      options.isolate_server, options.namespace)
1750  with get_storage(server_ref) as storage:
1751    # Fetching individual files.
1752    if options.file:
1753      # TODO(maruel): Enable cache in this case too.
1754      channel = threading_utils.TaskChannel()
1755      pending = {}
1756      for digest, dest in options.file:
1757        dest = six.text_type(dest)
1758        pending[digest] = dest
1759        storage.async_fetch(
1760            channel, threading_utils.PRIORITY_MED, digest,
1761            local_caching.UNKNOWN_FILE_SIZE,
1762            functools.partial(local_caching.file_write,
1763                              os.path.join(options.target, dest)))
1764      while pending:
1765        fetched = channel.next()
1766        dest = pending.pop(fetched)
1767        logging.info('%s: %s', fetched, dest)
1768
1769    # Fetching whole isolated tree.
1770    if options.isolated:
1771      bundle = fetch_isolated(
1772          isolated_hash=options.isolated,
1773          storage=storage,
1774          cache=cache,
1775          outdir=options.target,
1776          use_symlinks=options.use_symlinks)
1777      cache.trim()
1778      if bundle.command:
1779        rel = os.path.join(options.target, bundle.relative_cwd)
1780        print('To run this test please run from the directory %s:' %
1781              os.path.join(options.target, rel))
1782        print('  ' + ' '.join(bundle.command))
1783
1784  return 0
1785
1786
1787def add_archive_options(parser):
1788  parser.add_option(
1789      '--blacklist',
1790      action='append',
1791      default=list(DEFAULT_DENYLIST),
1792      help='List of regexp to use as denylist filter when uploading '
1793      'directories')
1794
1795
1796def add_isolate_server_options(parser):
1797  """Adds --isolate-server and --namespace options to parser."""
1798  parser.add_option(
1799      '-I', '--isolate-server',
1800      metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
1801      help='URL of the Isolate Server to use. Defaults to the environment '
1802           'variable ISOLATE_SERVER if set. No need to specify https://, this '
1803           'is assumed.')
1804  parser.add_option(
1805      '--namespace',
1806      default='default-gzip',
1807      help='The namespace to use on the Isolate Server, default: %default')
1808
1809
1810def process_isolate_server_options(parser, options, required):
1811  """Processes the --isolate-server option.
1812
1813  Returns the identity as determined by the server.
1814  """
1815  if not options.isolate_server:
1816    if required:
1817      parser.error('--isolate-server is required.')
1818    return
1819
1820  try:
1821    options.isolate_server = net.fix_url(options.isolate_server)
1822  except ValueError as e:
1823    parser.error('--isolate-server %s' % e)
1824
1825  try:
1826    return auth.ensure_logged_in(options.isolate_server)
1827  except ValueError as e:
1828    parser.error(str(e))
1829  return None
1830
1831
1832def add_cache_options(parser):
1833  cache_group = optparse.OptionGroup(parser, 'Isolated cache management')
1834  cache_group.add_option(
1835      '--cache', metavar='DIR', default='cache',
1836      help='Directory to keep a local cache of the files. Accelerates download '
1837           'by reusing already downloaded files. Default=%default')
1838  cache_group.add_option(
1839      '--max-cache-size',
1840      type='int',
1841      metavar='NNN',
1842      default=50*1024*1024*1024,
1843      help='Trim if the cache gets larger than this value, default=%default')
1844  cache_group.add_option(
1845      '--min-free-space',
1846      type='int',
1847      metavar='NNN',
1848      default=2*1024*1024*1024,
1849      help='Trim if disk free space becomes lower than this value, '
1850           'default=%default')
1851  cache_group.add_option(
1852      '--max-items',
1853      type='int',
1854      metavar='NNN',
1855      default=100000,
1856      help='Trim if more than this number of items are in the cache '
1857      'default=%default')
1858  parser.add_option_group(cache_group)
1859
1860
1861def process_cache_options(options, trim, **kwargs):
1862  if options.cache:
1863    policies = local_caching.CachePolicies(
1864        options.max_cache_size,
1865        options.min_free_space,
1866        options.max_items,
1867        # 3 weeks.
1868        max_age_secs=21 * 24 * 60 * 60)
1869
1870    # |options.cache| path may not exist until DiskContentAddressedCache()
1871    # instance is created.
1872    return local_caching.DiskContentAddressedCache(
1873        six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
1874  return local_caching.MemoryContentAddressedCache()
1875
1876
1877class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
1878
1879  def __init__(self, **kwargs):
1880    logging_utils.OptionParserWithLogging.__init__(
1881        self,
1882        version=__version__,
1883        prog=os.path.basename(sys.modules[__name__].__file__),
1884        **kwargs)
1885    auth.add_auth_options(self)
1886
1887  def parse_args(self, *args, **kwargs):
1888    options, args = logging_utils.OptionParserWithLogging.parse_args(
1889        self, *args, **kwargs)
1890    auth.process_auth_options(self, options)
1891    return options, args
1892
1893
1894def main(args):
1895  dispatcher = subcommand.CommandDispatcher(__name__)
1896  return dispatcher.execute(OptionParserIsolateServer(), args)
1897
1898
1899if __name__ == '__main__':
1900  subprocess42.inhibit_os_error_reporting()
1901  fix_encoding.fix_encoding()
1902  tools.disable_buffering()
1903  colorama.init()
1904  net.set_user_agent('isolateserver.py/' + __version__)
1905  sys.exit(main(sys.argv[1:]))
1906