1import configparser
2import os
3import shutil
4import stat
5from binascii import unhexlify
6from collections import namedtuple
7from time import perf_counter
8
9from .logger import create_logger
10
11logger = create_logger()
12
13files_cache_logger = create_logger('borg.debug.files_cache')
14
15from .constants import CACHE_README, DEFAULT_FILES_CACHE_MODE
16from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
17from .helpers import Location
18from .helpers import Error
19from .helpers import Manifest
20from .helpers import get_cache_dir, get_security_dir
21from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list
22from .helpers import format_file_size
23from .helpers import safe_ns
24from .helpers import yes, hostname_is_unique
25from .helpers import remove_surrogates
26from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage
27from .helpers import set_ec, EXIT_WARNING
28from .helpers import truncate_and_unlink
29from .helpers import msgpack
30from .item import ArchiveItem, ChunkListEntry
31from .crypto.key import PlaintextKey
32from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError
33from .locking import Lock
34from .platform import SaveFile
35from .remote import cache_if_remote
36from .repository import LIST_SCAN_LIMIT
37
38# note: cmtime might me either a ctime or a mtime timestamp
39FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size cmtime chunk_ids')
40
41
42class SecurityManager:
43    """
44    Tracks repositories. Ensures that nothing bad happens (repository swaps,
45    replay attacks, unknown repositories etc.).
46
47    This is complicated by the Cache being initially used for this, while
48    only some commands actually use the Cache, which meant that other commands
49    did not perform these checks.
50
51    Further complications were created by the Cache being a cache, so it
52    could be legitimately deleted, which is annoying because Borg didn't
53    recognize repositories after that.
54
55    Therefore a second location, the security database (see get_security_dir),
56    was introduced which stores this information. However, this means that
57    the code has to deal with a cache existing but no security DB entry,
58    or inconsistencies between the security DB and the cache which have to
59    be reconciled, and also with no cache existing but a security DB entry.
60    """
61
62    def __init__(self, repository):
63        self.repository = repository
64        self.dir = get_security_dir(repository.id_str)
65        self.cache_dir = cache_dir(repository)
66        self.key_type_file = os.path.join(self.dir, 'key-type')
67        self.location_file = os.path.join(self.dir, 'location')
68        self.manifest_ts_file = os.path.join(self.dir, 'manifest-timestamp')
69
70    @staticmethod
71    def destroy(repository, path=None):
72        """destroy the security dir for ``repository`` or at ``path``"""
73        path = path or get_security_dir(repository.id_str)
74        if os.path.exists(path):
75            shutil.rmtree(path)
76
77    def known(self):
78        return all(os.path.exists(f)
79                   for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
80
81    def key_matches(self, key):
82        if not self.known():
83            return False
84        try:
85            with open(self.key_type_file, 'r') as fd:
86                type = fd.read()
87                return type == str(key.TYPE)
88        except OSError as exc:
89            logger.warning('Could not read/parse key type file: %s', exc)
90
91    def save(self, manifest, key):
92        logger.debug('security: saving state for %s to %s', self.repository.id_str, self.dir)
93        current_location = self.repository._location.canonical_path()
94        logger.debug('security: current location   %s', current_location)
95        logger.debug('security: key type           %s', str(key.TYPE))
96        logger.debug('security: manifest timestamp %s', manifest.timestamp)
97        with SaveFile(self.location_file) as fd:
98            fd.write(current_location)
99        with SaveFile(self.key_type_file) as fd:
100            fd.write(str(key.TYPE))
101        with SaveFile(self.manifest_ts_file) as fd:
102            fd.write(manifest.timestamp)
103
104    def assert_location_matches(self, cache_config=None):
105        # Warn user before sending data to a relocated repository
106        try:
107            with open(self.location_file) as fd:
108                previous_location = fd.read()
109            logger.debug('security: read previous location %r', previous_location)
110        except FileNotFoundError:
111            logger.debug('security: previous location file %s not found', self.location_file)
112            previous_location = None
113        except OSError as exc:
114            logger.warning('Could not read previous location file: %s', exc)
115            previous_location = None
116        if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location:
117            # Reconcile cache and security dir; we take the cache location.
118            previous_location = cache_config.previous_location
119            logger.debug('security: using previous_location of cache: %r', previous_location)
120
121        repository_location = self.repository._location.canonical_path()
122        if previous_location and previous_location != repository_location:
123            msg = ("Warning: The repository at location {} was previously located at {}\n".format(
124                repository_location, previous_location) +
125                "Do you want to continue? [yN] ")
126            if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.",
127                       retry=False, env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'):
128                raise Cache.RepositoryAccessAborted()
129            # adapt on-disk config immediately if the new location was accepted
130            logger.debug('security: updating location stored in cache and security dir')
131            with SaveFile(self.location_file) as fd:
132                fd.write(repository_location)
133            if cache_config:
134                cache_config.save()
135
136    def assert_no_manifest_replay(self, manifest, key, cache_config=None):
137        try:
138            with open(self.manifest_ts_file) as fd:
139                timestamp = fd.read()
140            logger.debug('security: read manifest timestamp %r', timestamp)
141        except FileNotFoundError:
142            logger.debug('security: manifest timestamp file %s not found', self.manifest_ts_file)
143            timestamp = ''
144        except OSError as exc:
145            logger.warning('Could not read previous location file: %s', exc)
146            timestamp = ''
147        if cache_config:
148            timestamp = max(timestamp, cache_config.timestamp or '')
149        logger.debug('security: determined newest manifest timestamp as %s', timestamp)
150        # If repository is older than the cache or security dir something fishy is going on
151        if timestamp and timestamp > manifest.timestamp:
152            if isinstance(key, PlaintextKey):
153                raise Cache.RepositoryIDNotUnique()
154            else:
155                raise Cache.RepositoryReplay()
156
157    def assert_key_type(self, key, cache_config=None):
158        # Make sure an encrypted repository has not been swapped for an unencrypted repository
159        if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE):
160            raise Cache.EncryptionMethodMismatch()
161        if self.known() and not self.key_matches(key):
162            raise Cache.EncryptionMethodMismatch()
163
164    def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None):
165        # warn_if_unencrypted=False is only used for initializing a new repository.
166        # Thus, avoiding asking about a repository that's currently initializing.
167        self.assert_access_unknown(warn_if_unencrypted, manifest, key)
168        if cache_config:
169            self._assert_secure(manifest, key, cache_config)
170        else:
171            cache_config = CacheConfig(self.repository, lock_wait=lock_wait)
172            if cache_config.exists():
173                with cache_config:
174                    self._assert_secure(manifest, key, cache_config)
175            else:
176                self._assert_secure(manifest, key)
177        logger.debug('security: repository checks ok, allowing access')
178
179    def _assert_secure(self, manifest, key, cache_config=None):
180        self.assert_location_matches(cache_config)
181        self.assert_key_type(key, cache_config)
182        self.assert_no_manifest_replay(manifest, key, cache_config)
183        if not self.known():
184            logger.debug('security: remembering previously unknown repository')
185            self.save(manifest, key)
186
187    def assert_access_unknown(self, warn_if_unencrypted, manifest, key):
188        # warn_if_unencrypted=False is only used for initializing a new repository.
189        # Thus, avoiding asking about a repository that's currently initializing.
190        if not key.logically_encrypted and not self.known():
191            msg = ("Warning: Attempting to access a previously unknown unencrypted repository!\n" +
192                   "Do you want to continue? [yN] ")
193            allow_access = not warn_if_unencrypted or yes(msg, false_msg="Aborting.",
194                invalid_msg="Invalid answer, aborting.",
195                retry=False, env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK')
196            if allow_access:
197                if warn_if_unencrypted:
198                    logger.debug('security: remembering unknown unencrypted repository (explicitly allowed)')
199                else:
200                    logger.debug('security: initializing unencrypted repository')
201                self.save(manifest, key)
202            else:
203                raise Cache.CacheInitAbortedError()
204
205
206def assert_secure(repository, manifest, lock_wait):
207    sm = SecurityManager(repository)
208    sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait)
209
210
211def recanonicalize_relative_location(cache_location, repository):
212    # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741).
213    repo_location = repository._location.canonical_path()
214    rl = Location(repo_location)
215    cl = Location(cache_location)
216    if cl.proto == rl.proto and cl.user == rl.user and cl.host == rl.host and cl.port == rl.port \
217            and \
218            cl.path and rl.path and \
219            cl.path.startswith('/~/') and rl.path.startswith('/./') and cl.path[3:] == rl.path[3:]:
220        # everything is same except the expected change in relative path canonicalization,
221        # update previous_location to avoid warning / user query about changed location:
222        return repo_location
223    else:
224        return cache_location
225
226
227def cache_dir(repository, path=None):
228    return path or os.path.join(get_cache_dir(), repository.id_str)
229
230
231def files_cache_name():
232    suffix = os.environ.get('BORG_FILES_CACHE_SUFFIX', '')
233    return 'files.' + suffix if suffix else 'files'
234
235
236class CacheConfig:
237    def __init__(self, repository, path=None, lock_wait=None):
238        self.repository = repository
239        self.path = cache_dir(repository, path)
240        self.config_path = os.path.join(self.path, 'config')
241        self.lock = None
242        self.lock_wait = lock_wait
243
244    def __enter__(self):
245        self.open()
246        return self
247
248    def __exit__(self, exc_type, exc_val, exc_tb):
249        self.close()
250
251    def exists(self):
252        return os.path.exists(self.config_path)
253
254    def create(self):
255        assert not self.exists()
256        config = configparser.ConfigParser(interpolation=None)
257        config.add_section('cache')
258        config.set('cache', 'version', '1')
259        config.set('cache', 'repository', self.repository.id_str)
260        config.set('cache', 'manifest', '')
261        config.add_section('integrity')
262        config.set('integrity', 'manifest', '')
263        with SaveFile(self.config_path) as fd:
264            config.write(fd)
265
266    def open(self):
267        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=self.lock_wait,
268                         kill_stale_locks=hostname_is_unique()).acquire()
269        self.load()
270
271    def load(self):
272        self._config = configparser.ConfigParser(interpolation=None)
273        with open(self.config_path) as fd:
274            self._config.read_file(fd)
275        self._check_upgrade(self.config_path)
276        self.id = self._config.get('cache', 'repository')
277        self.manifest_id = unhexlify(self._config.get('cache', 'manifest'))
278        self.timestamp = self._config.get('cache', 'timestamp', fallback=None)
279        self.key_type = self._config.get('cache', 'key_type', fallback=None)
280        self.ignored_features = set(parse_stringified_list(self._config.get('cache', 'ignored_features', fallback='')))
281        self.mandatory_features = set(parse_stringified_list(self._config.get('cache', 'mandatory_features', fallback='')))
282        try:
283            self.integrity = dict(self._config.items('integrity'))
284            if self._config.get('cache', 'manifest') != self.integrity.pop('manifest'):
285                # The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
286                # is modified and then written out.), not re-created.
287                # Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
288                # Therefore, we also add the manifest ID to this section and
289                # can discern whether an older version interfered by comparing the manifest IDs of this section
290                # and the main [cache] section.
291                self.integrity = {}
292                logger.warning('Cache integrity data not available: old Borg version modified the cache.')
293        except configparser.NoSectionError:
294            logger.debug('Cache integrity: No integrity data found (files, chunks). Cache is from old version.')
295            self.integrity = {}
296        previous_location = self._config.get('cache', 'previous_location', fallback=None)
297        if previous_location:
298            self.previous_location = recanonicalize_relative_location(previous_location, self.repository)
299        else:
300            self.previous_location = None
301        self._config.set('cache', 'previous_location', self.repository._location.canonical_path())
302
303    def save(self, manifest=None, key=None):
304        if manifest:
305            self._config.set('cache', 'manifest', manifest.id_str)
306            self._config.set('cache', 'timestamp', manifest.timestamp)
307            self._config.set('cache', 'ignored_features', ','.join(self.ignored_features))
308            self._config.set('cache', 'mandatory_features', ','.join(self.mandatory_features))
309            if not self._config.has_section('integrity'):
310                self._config.add_section('integrity')
311            for file, integrity_data in self.integrity.items():
312                self._config.set('integrity', file, integrity_data)
313            self._config.set('integrity', 'manifest', manifest.id_str)
314        if key:
315            self._config.set('cache', 'key_type', str(key.TYPE))
316        with SaveFile(self.config_path) as fd:
317            self._config.write(fd)
318
319    def close(self):
320        if self.lock is not None:
321            self.lock.release()
322            self.lock = None
323
324    def _check_upgrade(self, config_path):
325        try:
326            cache_version = self._config.getint('cache', 'version')
327            wanted_version = 1
328            if cache_version != wanted_version:
329                self.close()
330                raise Exception('%s has unexpected cache version %d (wanted: %d).' %
331                                (config_path, cache_version, wanted_version))
332        except configparser.NoSectionError:
333            self.close()
334            raise Exception('%s does not look like a Borg cache.' % config_path) from None
335
336
337class Cache:
338    """Client Side cache
339    """
340    class RepositoryIDNotUnique(Error):
341        """Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
342
343    class RepositoryReplay(Error):
344        """Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
345
346    class CacheInitAbortedError(Error):
347        """Cache initialization aborted"""
348
349    class RepositoryAccessAborted(Error):
350        """Repository access aborted"""
351
352    class EncryptionMethodMismatch(Error):
353        """Repository encryption method changed since last access, refusing to continue"""
354
355    @staticmethod
356    def break_lock(repository, path=None):
357        path = cache_dir(repository, path)
358        Lock(os.path.join(path, 'lock'), exclusive=True).break_lock()
359
360    @staticmethod
361    def destroy(repository, path=None):
362        """destroy the cache for ``repository`` or at ``path``"""
363        path = path or os.path.join(get_cache_dir(), repository.id_str)
364        config = os.path.join(path, 'config')
365        if os.path.exists(config):
366            os.remove(config)  # kill config first
367            shutil.rmtree(path)
368
369    def __new__(cls, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True,
370                progress=False, lock_wait=None, permit_adhoc_cache=False, cache_mode=DEFAULT_FILES_CACHE_MODE,
371                ignore_inode=False):
372
373        if not do_files and 'd' not in cache_mode:
374            cache_mode = 'd'
375        elif ignore_inode and 'i' in cache_mode:
376            cache_mode = ''.join(set(cache_mode) - set('i'))
377
378        def local():
379            return LocalCache(repository=repository, key=key, manifest=manifest, path=path, sync=sync,
380                              warn_if_unencrypted=warn_if_unencrypted, progress=progress,
381                              lock_wait=lock_wait, cache_mode=cache_mode)
382
383        def adhoc():
384            return AdHocCache(repository=repository, key=key, manifest=manifest, lock_wait=lock_wait)
385
386        if not permit_adhoc_cache:
387            return local()
388
389        # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate
390        # it by needlessly using the ad-hoc cache.
391        # Check if the local cache exists and is in sync.
392
393        cache_config = CacheConfig(repository, path, lock_wait)
394        if cache_config.exists():
395            with cache_config:
396                cache_in_sync = cache_config.manifest_id == manifest.id
397            # Don't nest cache locks
398            if cache_in_sync:
399                # Local cache is in sync, use it
400                logger.debug('Cache: choosing local cache (in sync)')
401                return local()
402        logger.debug('Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)')
403        return adhoc()
404
405
406class CacheStatsMixin:
407    str_format = """\
408All archives:   {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s}
409
410                       Unique chunks         Total chunks
411Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
412
413    def __str__(self):
414        return self.str_format.format(self.format_tuple())
415
416    Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks',
417                                     'total_chunks'])
418
419    def stats(self):
420        # XXX: this should really be moved down to `hashindex.pyx`
421        stats = self.Summary(*self.chunks.summarize())._asdict()
422        return stats
423
424    def format_tuple(self):
425        stats = self.stats()
426        for field in ['total_size', 'total_csize', 'unique_csize']:
427            stats[field] = format_file_size(stats[field])
428        return self.Summary(**stats)
429
430    def chunks_stored_size(self):
431        return self.stats()['unique_csize']
432
433
434class LocalCache(CacheStatsMixin):
435    """
436    Persistent, local (client-side) cache.
437    """
438
439    def __init__(self, repository, key, manifest, path=None, sync=True, warn_if_unencrypted=True,
440                 progress=False, lock_wait=None, cache_mode=DEFAULT_FILES_CACHE_MODE):
441        """
442        :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
443        :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
444        :param sync: do :meth:`.sync`
445        :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
446        """
447        self.repository = repository
448        self.key = key
449        self.manifest = manifest
450        self.progress = progress
451        self.cache_mode = cache_mode
452        self.timestamp = None
453        self.txn_active = False
454
455        self.path = cache_dir(repository, path)
456        self.security_manager = SecurityManager(repository)
457        self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
458
459        # Warn user before sending data to a never seen before unencrypted repository
460        if not os.path.exists(self.path):
461            self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, key)
462            self.create()
463
464        self.open()
465        try:
466            self.security_manager.assert_secure(manifest, key, cache_config=self.cache_config)
467
468            if not self.check_cache_compatibility():
469                self.wipe_cache()
470
471            self.update_compatibility()
472
473            if sync and self.manifest.id != self.cache_config.manifest_id:
474                self.sync()
475                self.commit()
476        except:
477            self.close()
478            raise
479
480    def __enter__(self):
481        return self
482
483    def __exit__(self, exc_type, exc_val, exc_tb):
484        self.close()
485
486    def create(self):
487        """Create a new empty cache at `self.path`
488        """
489        os.makedirs(self.path)
490        with open(os.path.join(self.path, 'README'), 'w') as fd:
491            fd.write(CACHE_README)
492        self.cache_config.create()
493        ChunkIndex().write(os.path.join(self.path, 'chunks'))
494        os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
495        with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
496            pass  # empty file
497
498    def _do_open(self):
499        self.cache_config.load()
500        with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=False,
501                                  integrity_data=self.cache_config.integrity.get('chunks')) as fd:
502            self.chunks = ChunkIndex.read(fd)
503        if 'd' in self.cache_mode:  # d(isabled)
504            self.files = None
505        else:
506            self._read_files()
507
508    def open(self):
509        if not os.path.isdir(self.path):
510            raise Exception('%s Does not look like a Borg cache' % self.path)
511        self.cache_config.open()
512        self.rollback()
513
514    def close(self):
515        if self.cache_config is not None:
516            self.cache_config.close()
517            self.cache_config = None
518
519    def _read_files(self):
520        self.files = {}
521        self._newest_cmtime = None
522        logger.debug('Reading files cache ...')
523        files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
524        msg = None
525        try:
526            with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=False,
527                                      integrity_data=self.cache_config.integrity.get(files_cache_name())) as fd:
528                u = msgpack.Unpacker(use_list=True)
529                while True:
530                    data = fd.read(64 * 1024)
531                    if not data:
532                        break
533                    u.feed(data)
534                    try:
535                        for path_hash, item in u:
536                            entry = FileCacheEntry(*item)
537                            # in the end, this takes about 240 Bytes per file
538                            self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
539                    except (TypeError, ValueError) as exc:
540                        msg = "The files cache seems invalid. [%s]" % str(exc)
541                        break
542        except OSError as exc:
543            msg = "The files cache can't be read. [%s]" % str(exc)
544        except FileIntegrityError as fie:
545            msg = "The files cache is corrupted. [%s]" % str(fie)
546        if msg is not None:
547            logger.warning(msg)
548            logger.warning('Continuing without files cache - expect lower performance.')
549            self.files = {}
550        files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
551
552    def begin_txn(self):
553        # Initialize transaction snapshot
554        pi = ProgressIndicatorMessage(msgid='cache.begin_transaction')
555        txn_dir = os.path.join(self.path, 'txn.tmp')
556        os.mkdir(txn_dir)
557        pi.output('Initializing cache transaction: Reading config')
558        shutil.copy(os.path.join(self.path, 'config'), txn_dir)
559        pi.output('Initializing cache transaction: Reading chunks')
560        shutil.copy(os.path.join(self.path, 'chunks'), txn_dir)
561        pi.output('Initializing cache transaction: Reading files')
562        try:
563            shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
564        except FileNotFoundError:
565            with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
566                pass  # empty file
567        os.rename(os.path.join(self.path, 'txn.tmp'),
568                  os.path.join(self.path, 'txn.active'))
569        self.txn_active = True
570        pi.finish()
571
572    def commit(self):
573        """Commit transaction
574        """
575        if not self.txn_active:
576            return
577        self.security_manager.save(self.manifest, self.key)
578        pi = ProgressIndicatorMessage(msgid='cache.commit')
579        if self.files is not None:
580            if self._newest_cmtime is None:
581                # was never set because no files were modified/added
582                self._newest_cmtime = 2 ** 63 - 1  # nanoseconds, good until y2262
583            ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20))
584            pi.output('Saving files cache')
585            files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
586            with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
587                entry_count = 0
588                for path_hash, item in self.files.items():
589                    # Only keep files seen in this backup that are older than newest cmtime seen in this backup -
590                    # this is to avoid issues with filesystem snapshots and cmtime granularity.
591                    # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
592                    entry = FileCacheEntry(*msgpack.unpackb(item))
593                    if entry.age == 0 and bigint_to_int(entry.cmtime) < self._newest_cmtime or \
594                       entry.age > 0 and entry.age < ttl:
595                        msgpack.pack((path_hash, entry), fd)
596                        entry_count += 1
597            files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
598            files_cache_logger.debug("FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime)
599            files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
600            self.cache_config.integrity[files_cache_name()] = fd.integrity_data
601        pi.output('Saving chunks cache')
602        with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=True) as fd:
603            self.chunks.write(fd)
604        self.cache_config.integrity['chunks'] = fd.integrity_data
605        pi.output('Saving cache config')
606        self.cache_config.save(self.manifest, self.key)
607        os.rename(os.path.join(self.path, 'txn.active'),
608                  os.path.join(self.path, 'txn.tmp'))
609        shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
610        self.txn_active = False
611        pi.finish()
612
613    def rollback(self):
614        """Roll back partial and aborted transactions
615        """
616        # Remove partial transaction
617        if os.path.exists(os.path.join(self.path, 'txn.tmp')):
618            shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
619        # Roll back active transaction
620        txn_dir = os.path.join(self.path, 'txn.active')
621        if os.path.exists(txn_dir):
622            shutil.copy(os.path.join(txn_dir, 'config'), self.path)
623            shutil.copy(os.path.join(txn_dir, 'chunks'), self.path)
624            shutil.copy(os.path.join(txn_dir, files_cache_name()), self.path)
625            os.rename(txn_dir, os.path.join(self.path, 'txn.tmp'))
626            if os.path.exists(os.path.join(self.path, 'txn.tmp')):
627                shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
628        self.txn_active = False
629        self._do_open()
630
631    def sync(self):
632        """Re-synchronize chunks cache with repository.
633
634        Maintains a directory with known backup archive indexes, so it only
635        needs to fetch infos from repo and build a chunk index once per backup
636        archive.
637        If out of sync, missing archive indexes get added, outdated indexes
638        get removed and a new master chunks index is built by merging all
639        archive indexes.
640        """
641        archive_path = os.path.join(self.path, 'chunks.archive.d')
642        # An index of chunks whose size had to be fetched
643        chunks_fetched_size_index = ChunkIndex()
644        # Instrumentation
645        processed_item_metadata_bytes = 0
646        processed_item_metadata_chunks = 0
647        compact_chunks_archive_saved_space = 0
648        fetched_chunks_for_csize = 0
649        fetched_bytes_for_csize = 0
650
651        def mkpath(id, suffix=''):
652            id_hex = bin_to_hex(id)
653            path = os.path.join(archive_path, id_hex + suffix)
654            return path
655
656        def cached_archives():
657            if self.do_cache:
658                fns = os.listdir(archive_path)
659                # filenames with 64 hex digits == 256bit,
660                # or compact indices which are 64 hex digits + ".compact"
661                return set(unhexlify(fn) for fn in fns if len(fn) == 64) | \
662                       set(unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact'))
663            else:
664                return set()
665
666        def repo_archives():
667            return set(info.id for info in self.manifest.archives.list())
668
669        def cleanup_outdated(ids):
670            for id in ids:
671                cleanup_cached_archive(id)
672
673        def cleanup_cached_archive(id, cleanup_compact=True):
674            try:
675                os.unlink(mkpath(id))
676                os.unlink(mkpath(id) + '.integrity')
677            except FileNotFoundError:
678                pass
679            if not cleanup_compact:
680                return
681            try:
682                os.unlink(mkpath(id, suffix='.compact'))
683                os.unlink(mkpath(id, suffix='.compact') + '.integrity')
684            except FileNotFoundError:
685                pass
686
687        def fetch_missing_csize(chunk_idx):
688            """
689            Archives created with AdHocCache will have csize=0 in all chunk list entries whose
690            chunks were already in the repository.
691
692            Scan *chunk_idx* for entries where csize=0 and fill in the correct information.
693            """
694            nonlocal fetched_chunks_for_csize
695            nonlocal fetched_bytes_for_csize
696
697            all_missing_ids = chunk_idx.zero_csize_ids()
698            fetch_ids = []
699            if len(chunks_fetched_size_index):
700                for id_ in all_missing_ids:
701                    already_fetched_entry = chunks_fetched_size_index.get(id_)
702                    if already_fetched_entry:
703                        entry = chunk_idx[id_]._replace(csize=already_fetched_entry.csize)
704                        assert entry.size == already_fetched_entry.size, 'Chunk size mismatch'
705                        chunk_idx[id_] = entry
706                    else:
707                        fetch_ids.append(id_)
708            else:
709                fetch_ids = all_missing_ids
710
711            # This is potentially a rather expensive operation, but it's hard to tell at this point
712            # if it's a problem in practice (hence the experimental status of --no-cache-sync).
713            for id_, data in zip(fetch_ids, decrypted_repository.repository.get_many(fetch_ids)):
714                entry = chunk_idx[id_]._replace(csize=len(data))
715                chunk_idx[id_] = entry
716                chunks_fetched_size_index[id_] = entry
717                fetched_chunks_for_csize += 1
718                fetched_bytes_for_csize += len(data)
719
720        def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx):
721            nonlocal processed_item_metadata_bytes
722            nonlocal processed_item_metadata_chunks
723            csize, data = decrypted_repository.get(archive_id)
724            chunk_idx.add(archive_id, 1, len(data), csize)
725            archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
726            if archive.version != 1:
727                raise Exception('Unknown archive metadata version')
728            sync = CacheSynchronizer(chunk_idx)
729            for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)):
730                chunk_idx.add(item_id, 1, len(data), csize)
731                processed_item_metadata_bytes += len(data)
732                processed_item_metadata_chunks += 1
733                sync.feed(data)
734            if self.do_cache:
735                fetch_missing_csize(chunk_idx)
736                write_archive_index(archive_id, chunk_idx)
737
738        def write_archive_index(archive_id, chunk_idx):
739            nonlocal compact_chunks_archive_saved_space
740            compact_chunks_archive_saved_space += chunk_idx.compact()
741            fn = mkpath(archive_id, suffix='.compact')
742            fn_tmp = mkpath(archive_id, suffix='.tmp')
743            try:
744                with DetachedIntegrityCheckedFile(path=fn_tmp, write=True,
745                                                  filename=bin_to_hex(archive_id) + '.compact') as fd:
746                    chunk_idx.write(fd)
747            except Exception:
748                truncate_and_unlink(fn_tmp)
749            else:
750                os.rename(fn_tmp, fn)
751
752        def read_archive_index(archive_id, archive_name):
753            archive_chunk_idx_path = mkpath(archive_id)
754            logger.info("Reading cached archive chunk index for %s ...", archive_name)
755            try:
756                try:
757                    # Attempt to load compact index first
758                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + '.compact', write=False) as fd:
759                        archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True)
760                    # In case a non-compact index exists, delete it.
761                    cleanup_cached_archive(archive_id, cleanup_compact=False)
762                    # Compact index read - return index, no conversion necessary (below).
763                    return archive_chunk_idx
764                except FileNotFoundError:
765                    # No compact index found, load non-compact index, and convert below.
766                    with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd:
767                        archive_chunk_idx = ChunkIndex.read(fd)
768            except FileIntegrityError as fie:
769                logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie)
770                # Delete corrupted index, set warning. A new index must be build.
771                cleanup_cached_archive(archive_id)
772                set_ec(EXIT_WARNING)
773                return None
774
775            # Convert to compact index. Delete the existing index first.
776            logger.debug('Found non-compact index for %s, converting to compact.', archive_name)
777            cleanup_cached_archive(archive_id)
778            write_archive_index(archive_id, archive_chunk_idx)
779            return archive_chunk_idx
780
781        def get_archive_ids_to_names(archive_ids):
782            # Pass once over all archives and build a mapping from ids to names.
783            # The easier approach, doing a similar loop for each archive, has
784            # square complexity and does about a dozen million functions calls
785            # with 1100 archives (which takes 30s CPU seconds _alone_).
786            archive_names = {}
787            for info in self.manifest.archives.list():
788                if info.id in archive_ids:
789                    archive_names[info.id] = info.name
790            assert len(archive_names) == len(archive_ids)
791            return archive_names
792
793        def create_master_idx(chunk_idx):
794            logger.info('Synchronizing chunks cache...')
795            cached_ids = cached_archives()
796            archive_ids = repo_archives()
797            logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.',
798                len(archive_ids), len(cached_ids),
799                len(cached_ids - archive_ids), len(archive_ids - cached_ids))
800            # deallocates old hashindex, creates empty hashindex:
801            chunk_idx.clear()
802            cleanup_outdated(cached_ids - archive_ids)
803            # Explicitly set the initial hash table capacity to avoid performance issues
804            # due to hash table "resonance".
805            master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR)
806            if archive_ids:
807                chunk_idx = None if not self.do_cache else ChunkIndex(master_index_capacity)
808                pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1,
809                                              msg='%3.0f%% Syncing chunks cache. Processing archive %s',
810                                              msgid='cache.sync')
811                archive_ids_to_names = get_archive_ids_to_names(archive_ids)
812                for archive_id, archive_name in archive_ids_to_names.items():
813                    pi.show(info=[remove_surrogates(archive_name)])
814                    if self.do_cache:
815                        if archive_id in cached_ids:
816                            archive_chunk_idx = read_archive_index(archive_id, archive_name)
817                            if archive_chunk_idx is None:
818                                cached_ids.remove(archive_id)
819                        if archive_id not in cached_ids:
820                            # Do not make this an else branch; the FileIntegrityError exception handler
821                            # above can remove *archive_id* from *cached_ids*.
822                            logger.info('Fetching and building archive index for %s ...', archive_name)
823                            archive_chunk_idx = ChunkIndex()
824                            fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx)
825                        logger.info("Merging into master chunks index ...")
826                        chunk_idx.merge(archive_chunk_idx)
827                    else:
828                        chunk_idx = chunk_idx or ChunkIndex(master_index_capacity)
829                        logger.info('Fetching archive index for %s ...', archive_name)
830                        fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx)
831                if not self.do_cache:
832                    fetch_missing_csize(chunk_idx)
833                pi.finish()
834                logger.debug('Cache sync: had to fetch %s (%d chunks) because no archive had a csize set for them '
835                             '(due to --no-cache-sync)',
836                             format_file_size(fetched_bytes_for_csize), fetched_chunks_for_csize)
837                logger.debug('Cache sync: processed %s (%d chunks) of metadata',
838                             format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks)
839                logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes',
840                             format_file_size(compact_chunks_archive_saved_space))
841            logger.info('Done.')
842            return chunk_idx
843
844        def legacy_cleanup():
845            """bring old cache dirs into the desired state (cleanup and adapt)"""
846            try:
847                os.unlink(os.path.join(self.path, 'chunks.archive'))
848            except:
849                pass
850            try:
851                os.unlink(os.path.join(self.path, 'chunks.archive.tmp'))
852            except:
853                pass
854            try:
855                os.mkdir(archive_path)
856            except:
857                pass
858
859        # The cache can be used by a command that e.g. only checks against Manifest.Operation.WRITE,
860        # which does not have to include all flags from Manifest.Operation.READ.
861        # Since the sync will attempt to read archives, check compatibility with Manifest.Operation.READ.
862        self.manifest.check_repository_compatibility((Manifest.Operation.READ, ))
863
864        self.begin_txn()
865        with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository:
866            legacy_cleanup()
867            # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d -
868            # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk)
869            self.do_cache = os.path.isdir(archive_path)
870            self.chunks = create_master_idx(self.chunks)
871
872    def check_cache_compatibility(self):
873        my_features = Manifest.SUPPORTED_REPO_FEATURES
874        if self.cache_config.ignored_features & my_features:
875            # The cache might not contain references of chunks that need a feature that is mandatory for some operation
876            # and which this version supports. To avoid corruption while executing that operation force rebuild.
877            return False
878        if not self.cache_config.mandatory_features <= my_features:
879            # The cache was build with consideration to at least one feature that this version does not understand.
880            # This client might misinterpret the cache. Thus force a rebuild.
881            return False
882        return True
883
884    def wipe_cache(self):
885        logger.warning("Discarding incompatible cache and forcing a cache rebuild")
886        archive_path = os.path.join(self.path, 'chunks.archive.d')
887        if os.path.isdir(archive_path):
888            shutil.rmtree(os.path.join(self.path, 'chunks.archive.d'))
889            os.makedirs(os.path.join(self.path, 'chunks.archive.d'))
890        self.chunks = ChunkIndex()
891        with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
892            pass  # empty file
893        self.cache_config.manifest_id = ''
894        self.cache_config._config.set('cache', 'manifest', '')
895
896        self.cache_config.ignored_features = set()
897        self.cache_config.mandatory_features = set()
898
899    def update_compatibility(self):
900        operation_to_features_map = self.manifest.get_all_mandatory_features()
901        my_features = Manifest.SUPPORTED_REPO_FEATURES
902        repo_features = set()
903        for operation, features in operation_to_features_map.items():
904            repo_features.update(features)
905
906        self.cache_config.ignored_features.update(repo_features - my_features)
907        self.cache_config.mandatory_features.update(repo_features & my_features)
908
909    def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
910        if not self.txn_active:
911            self.begin_txn()
912        size = len(chunk)
913        refcount = self.seen_chunk(id, size)
914        if refcount and not overwrite:
915            return self.chunk_incref(id, stats)
916        data = self.key.encrypt(chunk)
917        csize = len(data)
918        self.repository.put(id, data, wait=wait)
919        self.chunks.add(id, 1, size, csize)
920        stats.update(size, csize, not refcount)
921        return ChunkListEntry(id, size, csize)
922
923    def seen_chunk(self, id, size=None):
924        refcount, stored_size, _ = self.chunks.get(id, ChunkIndexEntry(0, None, None))
925        if size is not None and stored_size is not None and size != stored_size:
926            # we already have a chunk with that id, but different size.
927            # this is either a hash collision (unlikely) or corruption or a bug.
928            raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
929                            id, stored_size, size))
930        return refcount
931
932    def chunk_incref(self, id, stats, size=None):
933        if not self.txn_active:
934            self.begin_txn()
935        count, _size, csize = self.chunks.incref(id)
936        stats.update(_size, csize, False)
937        return ChunkListEntry(id, _size, csize)
938
939    def chunk_decref(self, id, stats, wait=True):
940        if not self.txn_active:
941            self.begin_txn()
942        count, size, csize = self.chunks.decref(id)
943        if count == 0:
944            del self.chunks[id]
945            self.repository.delete(id, wait=wait)
946            stats.update(-size, -csize, True)
947        else:
948            stats.update(-size, -csize, False)
949
950    def file_known_and_unchanged(self, hashed_path, path_hash, st):
951        """
952        Check if we know the file that has this path_hash (know == it is in our files cache) and
953        whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
954
955        :param hashed_path: the file's path as we gave it to hash(hashed_path)
956        :param path_hash: hash(hashed_path), to save some memory in the files cache
957        :param st: the file's stat() result
958        :return: known, ids (known is True if we have infos about this file in the cache,
959                             ids is the list of chunk ids IF the file has not changed, otherwise None).
960        """
961        if not stat.S_ISREG(st.st_mode):
962            return False, None
963        cache_mode = self.cache_mode
964        if 'd' in cache_mode:  # d(isabled)
965            files_cache_logger.debug('UNKNOWN: files cache disabled')
966            return False, None
967        # note: r(echunk) does not need the files cache in this method, but the files cache will
968        # be updated and saved to disk to memorize the files. To preserve previous generations in
969        # the cache, this means that it also needs to get loaded from disk first.
970        if 'r' in cache_mode:  # r(echunk)
971            files_cache_logger.debug('UNKNOWN: rechunking enforced')
972            return False, None
973        entry = self.files.get(path_hash)
974        if not entry:
975            files_cache_logger.debug('UNKNOWN: no file metadata in cache for: %r', hashed_path)
976            return False, None
977        # we know the file!
978        entry = FileCacheEntry(*msgpack.unpackb(entry))
979        if 's' in cache_mode and entry.size != st.st_size:
980            files_cache_logger.debug('KNOWN-CHANGED: file size has changed: %r', hashed_path)
981            return True, None
982        if 'i' in cache_mode and entry.inode != st.st_ino:
983            files_cache_logger.debug('KNOWN-CHANGED: file inode number has changed: %r', hashed_path)
984            return True, None
985        if 'c' in cache_mode and bigint_to_int(entry.cmtime) != st.st_ctime_ns:
986            files_cache_logger.debug('KNOWN-CHANGED: file ctime has changed: %r', hashed_path)
987            return True, None
988        elif 'm' in cache_mode and bigint_to_int(entry.cmtime) != st.st_mtime_ns:
989            files_cache_logger.debug('KNOWN-CHANGED: file mtime has changed: %r', hashed_path)
990            return True, None
991        # we ignored the inode number in the comparison above or it is still same.
992        # if it is still the same, replacing it in the tuple doesn't change it.
993        # if we ignored it, a reason for doing that is that files were moved to a new
994        # disk / new fs (so a one-time change of inode number is expected) and we wanted
995        # to avoid everything getting chunked again. to be able to re-enable the inode
996        # number comparison in a future backup run (and avoid chunking everything
997        # again at that time), we need to update the inode number in the cache with what
998        # we see in the filesystem.
999        self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
1000        return True, entry.chunk_ids
1001
1002    def memorize_file(self, hashed_path, path_hash, st, ids):
1003        if not stat.S_ISREG(st.st_mode):
1004            return
1005        cache_mode = self.cache_mode
1006        # note: r(echunk) modes will update the files cache, d(isabled) mode won't
1007        if 'd' in cache_mode:
1008            files_cache_logger.debug('FILES-CACHE-NOUPDATE: files cache disabled')
1009            return
1010        if 'c' in cache_mode:
1011            cmtime_type = 'ctime'
1012            cmtime_ns = safe_ns(st.st_ctime_ns)
1013        elif 'm' in cache_mode:
1014            cmtime_type = 'mtime'
1015            cmtime_ns = safe_ns(st.st_mtime_ns)
1016        entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_bigint(cmtime_ns), chunk_ids=ids)
1017        self.files[path_hash] = msgpack.packb(entry)
1018        self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
1019        files_cache_logger.debug('FILES-CACHE-UPDATE: put %r [has %s] <- %r',
1020                                 entry._replace(chunk_ids='[%d entries]' % len(entry.chunk_ids)),
1021                                 cmtime_type, hashed_path)
1022
1023
1024class AdHocCache(CacheStatsMixin):
1025    """
1026    Ad-hoc, non-persistent cache.
1027
1028    Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count,
1029    nor does it provide a files cache (which would require persistence). Chunks that were not added
1030    during the current AdHocCache lifetime won't have correct size/csize set (0 bytes) and will
1031    have an infinite reference count (MAX_VALUE).
1032    """
1033
1034    str_format = """\
1035All archives:                unknown              unknown              unknown
1036
1037                       Unique chunks         Total chunks
1038Chunk index:    {0.total_unique_chunks:20d}             unknown"""
1039
1040    def __init__(self, repository, key, manifest, warn_if_unencrypted=True, lock_wait=None):
1041        self.repository = repository
1042        self.key = key
1043        self.manifest = manifest
1044        self._txn_active = False
1045
1046        self.security_manager = SecurityManager(repository)
1047        self.security_manager.assert_secure(manifest, key, lock_wait=lock_wait)
1048
1049        logger.warning('Note: --no-cache-sync is an experimental feature.')
1050
1051    # Public API
1052
1053    def __enter__(self):
1054        return self
1055
1056    def __exit__(self, exc_type, exc_val, exc_tb):
1057        pass
1058
1059    files = None
1060    cache_mode = 'd'
1061
1062    def file_known_and_unchanged(self, hashed_path, path_hash, st):
1063        files_cache_logger.debug("UNKNOWN: files cache not implemented")
1064        return False, None
1065
1066    def memorize_file(self, hashed_path, path_hash, st, ids):
1067        pass
1068
1069    def add_chunk(self, id, chunk, stats, overwrite=False, wait=True):
1070        assert not overwrite, 'AdHocCache does not permit overwrites — trying to use it for recreate?'
1071        if not self._txn_active:
1072            self._begin_txn()
1073        size = len(chunk)
1074        refcount = self.seen_chunk(id, size)
1075        if refcount:
1076            return self.chunk_incref(id, stats, size=size)
1077        data = self.key.encrypt(chunk)
1078        csize = len(data)
1079        self.repository.put(id, data, wait=wait)
1080        self.chunks.add(id, 1, size, csize)
1081        stats.update(size, csize, not refcount)
1082        return ChunkListEntry(id, size, csize)
1083
1084    def seen_chunk(self, id, size=None):
1085        if not self._txn_active:
1086            self._begin_txn()
1087        entry = self.chunks.get(id, ChunkIndexEntry(0, None, None))
1088        if entry.refcount and size and not entry.size:
1089            # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
1090            # This is of course not possible for the AdHocCache.
1091            # Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
1092            self.chunks[id] = entry._replace(size=size)
1093        return entry.refcount
1094
1095    def chunk_incref(self, id, stats, size=None):
1096        if not self._txn_active:
1097            self._begin_txn()
1098        count, _size, csize = self.chunks.incref(id)
1099        # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with
1100        # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to.
1101        size = _size or size
1102        assert size
1103        stats.update(size, csize, False)
1104        return ChunkListEntry(id, size, csize)
1105
1106    def chunk_decref(self, id, stats, wait=True):
1107        if not self._txn_active:
1108            self._begin_txn()
1109        count, size, csize = self.chunks.decref(id)
1110        if count == 0:
1111            del self.chunks[id]
1112            self.repository.delete(id, wait=wait)
1113            stats.update(-size, -csize, True)
1114        else:
1115            stats.update(-size, -csize, False)
1116
1117    def commit(self):
1118        if not self._txn_active:
1119            return
1120        self.security_manager.save(self.manifest, self.key)
1121        self._txn_active = False
1122
1123    def rollback(self):
1124        self._txn_active = False
1125        del self.chunks
1126
1127    # Private API
1128
1129    def _begin_txn(self):
1130        self._txn_active = True
1131        # Explicitly set the initial hash table capacity to avoid performance issues
1132        # due to hash table "resonance".
1133        # Since we're creating an archive, add 10 % from the start.
1134        num_chunks = len(self.repository)
1135        capacity = int(num_chunks / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
1136        self.chunks = ChunkIndex(capacity)
1137        pi = ProgressIndicatorPercent(total=num_chunks, msg='Downloading chunk list... %3.0f%%',
1138                                      msgid='cache.download_chunks')
1139        t0 = perf_counter()
1140        num_requests = 0
1141        marker = None
1142        while True:
1143            result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
1144            num_requests += 1
1145            if not result:
1146                break
1147            pi.show(increase=len(result))
1148            marker = result[-1]
1149            # All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
1150            # therefore we can't/won't delete them. Chunks we added ourselves in this transaction
1151            # (e.g. checkpoint archives) are tracked correctly.
1152            init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0, csize=0)
1153            for id_ in result:
1154                self.chunks[id_] = init_entry
1155        assert len(self.chunks) == num_chunks
1156        # LocalCache does not contain the manifest, either.
1157        del self.chunks[self.manifest.MANIFEST_ID]
1158        duration = perf_counter() - t0 or 0.01
1159        pi.finish()
1160        logger.debug('AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s',
1161                     num_chunks, duration, num_requests, format_file_size(num_chunks * 34 / duration))
1162        # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
1163        # Protocol overhead is neglected in this calculation.
1164