1import configparser 2import os 3import shutil 4import stat 5from binascii import unhexlify 6from collections import namedtuple 7from time import perf_counter 8 9from .logger import create_logger 10 11logger = create_logger() 12 13files_cache_logger = create_logger('borg.debug.files_cache') 14 15from .constants import CACHE_README, DEFAULT_FILES_CACHE_MODE 16from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer 17from .helpers import Location 18from .helpers import Error 19from .helpers import Manifest 20from .helpers import get_cache_dir, get_security_dir 21from .helpers import int_to_bigint, bigint_to_int, bin_to_hex, parse_stringified_list 22from .helpers import format_file_size 23from .helpers import safe_ns 24from .helpers import yes, hostname_is_unique 25from .helpers import remove_surrogates 26from .helpers import ProgressIndicatorPercent, ProgressIndicatorMessage 27from .helpers import set_ec, EXIT_WARNING 28from .helpers import truncate_and_unlink 29from .helpers import msgpack 30from .item import ArchiveItem, ChunkListEntry 31from .crypto.key import PlaintextKey 32from .crypto.file_integrity import IntegrityCheckedFile, DetachedIntegrityCheckedFile, FileIntegrityError 33from .locking import Lock 34from .platform import SaveFile 35from .remote import cache_if_remote 36from .repository import LIST_SCAN_LIMIT 37 38# note: cmtime might me either a ctime or a mtime timestamp 39FileCacheEntry = namedtuple('FileCacheEntry', 'age inode size cmtime chunk_ids') 40 41 42class SecurityManager: 43 """ 44 Tracks repositories. Ensures that nothing bad happens (repository swaps, 45 replay attacks, unknown repositories etc.). 46 47 This is complicated by the Cache being initially used for this, while 48 only some commands actually use the Cache, which meant that other commands 49 did not perform these checks. 50 51 Further complications were created by the Cache being a cache, so it 52 could be legitimately deleted, which is annoying because Borg didn't 53 recognize repositories after that. 54 55 Therefore a second location, the security database (see get_security_dir), 56 was introduced which stores this information. However, this means that 57 the code has to deal with a cache existing but no security DB entry, 58 or inconsistencies between the security DB and the cache which have to 59 be reconciled, and also with no cache existing but a security DB entry. 60 """ 61 62 def __init__(self, repository): 63 self.repository = repository 64 self.dir = get_security_dir(repository.id_str) 65 self.cache_dir = cache_dir(repository) 66 self.key_type_file = os.path.join(self.dir, 'key-type') 67 self.location_file = os.path.join(self.dir, 'location') 68 self.manifest_ts_file = os.path.join(self.dir, 'manifest-timestamp') 69 70 @staticmethod 71 def destroy(repository, path=None): 72 """destroy the security dir for ``repository`` or at ``path``""" 73 path = path or get_security_dir(repository.id_str) 74 if os.path.exists(path): 75 shutil.rmtree(path) 76 77 def known(self): 78 return all(os.path.exists(f) 79 for f in (self.key_type_file, self.location_file, self.manifest_ts_file)) 80 81 def key_matches(self, key): 82 if not self.known(): 83 return False 84 try: 85 with open(self.key_type_file, 'r') as fd: 86 type = fd.read() 87 return type == str(key.TYPE) 88 except OSError as exc: 89 logger.warning('Could not read/parse key type file: %s', exc) 90 91 def save(self, manifest, key): 92 logger.debug('security: saving state for %s to %s', self.repository.id_str, self.dir) 93 current_location = self.repository._location.canonical_path() 94 logger.debug('security: current location %s', current_location) 95 logger.debug('security: key type %s', str(key.TYPE)) 96 logger.debug('security: manifest timestamp %s', manifest.timestamp) 97 with SaveFile(self.location_file) as fd: 98 fd.write(current_location) 99 with SaveFile(self.key_type_file) as fd: 100 fd.write(str(key.TYPE)) 101 with SaveFile(self.manifest_ts_file) as fd: 102 fd.write(manifest.timestamp) 103 104 def assert_location_matches(self, cache_config=None): 105 # Warn user before sending data to a relocated repository 106 try: 107 with open(self.location_file) as fd: 108 previous_location = fd.read() 109 logger.debug('security: read previous location %r', previous_location) 110 except FileNotFoundError: 111 logger.debug('security: previous location file %s not found', self.location_file) 112 previous_location = None 113 except OSError as exc: 114 logger.warning('Could not read previous location file: %s', exc) 115 previous_location = None 116 if cache_config and cache_config.previous_location and previous_location != cache_config.previous_location: 117 # Reconcile cache and security dir; we take the cache location. 118 previous_location = cache_config.previous_location 119 logger.debug('security: using previous_location of cache: %r', previous_location) 120 121 repository_location = self.repository._location.canonical_path() 122 if previous_location and previous_location != repository_location: 123 msg = ("Warning: The repository at location {} was previously located at {}\n".format( 124 repository_location, previous_location) + 125 "Do you want to continue? [yN] ") 126 if not yes(msg, false_msg="Aborting.", invalid_msg="Invalid answer, aborting.", 127 retry=False, env_var_override='BORG_RELOCATED_REPO_ACCESS_IS_OK'): 128 raise Cache.RepositoryAccessAborted() 129 # adapt on-disk config immediately if the new location was accepted 130 logger.debug('security: updating location stored in cache and security dir') 131 with SaveFile(self.location_file) as fd: 132 fd.write(repository_location) 133 if cache_config: 134 cache_config.save() 135 136 def assert_no_manifest_replay(self, manifest, key, cache_config=None): 137 try: 138 with open(self.manifest_ts_file) as fd: 139 timestamp = fd.read() 140 logger.debug('security: read manifest timestamp %r', timestamp) 141 except FileNotFoundError: 142 logger.debug('security: manifest timestamp file %s not found', self.manifest_ts_file) 143 timestamp = '' 144 except OSError as exc: 145 logger.warning('Could not read previous location file: %s', exc) 146 timestamp = '' 147 if cache_config: 148 timestamp = max(timestamp, cache_config.timestamp or '') 149 logger.debug('security: determined newest manifest timestamp as %s', timestamp) 150 # If repository is older than the cache or security dir something fishy is going on 151 if timestamp and timestamp > manifest.timestamp: 152 if isinstance(key, PlaintextKey): 153 raise Cache.RepositoryIDNotUnique() 154 else: 155 raise Cache.RepositoryReplay() 156 157 def assert_key_type(self, key, cache_config=None): 158 # Make sure an encrypted repository has not been swapped for an unencrypted repository 159 if cache_config and cache_config.key_type is not None and cache_config.key_type != str(key.TYPE): 160 raise Cache.EncryptionMethodMismatch() 161 if self.known() and not self.key_matches(key): 162 raise Cache.EncryptionMethodMismatch() 163 164 def assert_secure(self, manifest, key, *, cache_config=None, warn_if_unencrypted=True, lock_wait=None): 165 # warn_if_unencrypted=False is only used for initializing a new repository. 166 # Thus, avoiding asking about a repository that's currently initializing. 167 self.assert_access_unknown(warn_if_unencrypted, manifest, key) 168 if cache_config: 169 self._assert_secure(manifest, key, cache_config) 170 else: 171 cache_config = CacheConfig(self.repository, lock_wait=lock_wait) 172 if cache_config.exists(): 173 with cache_config: 174 self._assert_secure(manifest, key, cache_config) 175 else: 176 self._assert_secure(manifest, key) 177 logger.debug('security: repository checks ok, allowing access') 178 179 def _assert_secure(self, manifest, key, cache_config=None): 180 self.assert_location_matches(cache_config) 181 self.assert_key_type(key, cache_config) 182 self.assert_no_manifest_replay(manifest, key, cache_config) 183 if not self.known(): 184 logger.debug('security: remembering previously unknown repository') 185 self.save(manifest, key) 186 187 def assert_access_unknown(self, warn_if_unencrypted, manifest, key): 188 # warn_if_unencrypted=False is only used for initializing a new repository. 189 # Thus, avoiding asking about a repository that's currently initializing. 190 if not key.logically_encrypted and not self.known(): 191 msg = ("Warning: Attempting to access a previously unknown unencrypted repository!\n" + 192 "Do you want to continue? [yN] ") 193 allow_access = not warn_if_unencrypted or yes(msg, false_msg="Aborting.", 194 invalid_msg="Invalid answer, aborting.", 195 retry=False, env_var_override='BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK') 196 if allow_access: 197 if warn_if_unencrypted: 198 logger.debug('security: remembering unknown unencrypted repository (explicitly allowed)') 199 else: 200 logger.debug('security: initializing unencrypted repository') 201 self.save(manifest, key) 202 else: 203 raise Cache.CacheInitAbortedError() 204 205 206def assert_secure(repository, manifest, lock_wait): 207 sm = SecurityManager(repository) 208 sm.assert_secure(manifest, manifest.key, lock_wait=lock_wait) 209 210 211def recanonicalize_relative_location(cache_location, repository): 212 # borg < 1.0.8rc1 had different canonicalization for the repo location (see #1655 and #1741). 213 repo_location = repository._location.canonical_path() 214 rl = Location(repo_location) 215 cl = Location(cache_location) 216 if cl.proto == rl.proto and cl.user == rl.user and cl.host == rl.host and cl.port == rl.port \ 217 and \ 218 cl.path and rl.path and \ 219 cl.path.startswith('/~/') and rl.path.startswith('/./') and cl.path[3:] == rl.path[3:]: 220 # everything is same except the expected change in relative path canonicalization, 221 # update previous_location to avoid warning / user query about changed location: 222 return repo_location 223 else: 224 return cache_location 225 226 227def cache_dir(repository, path=None): 228 return path or os.path.join(get_cache_dir(), repository.id_str) 229 230 231def files_cache_name(): 232 suffix = os.environ.get('BORG_FILES_CACHE_SUFFIX', '') 233 return 'files.' + suffix if suffix else 'files' 234 235 236class CacheConfig: 237 def __init__(self, repository, path=None, lock_wait=None): 238 self.repository = repository 239 self.path = cache_dir(repository, path) 240 self.config_path = os.path.join(self.path, 'config') 241 self.lock = None 242 self.lock_wait = lock_wait 243 244 def __enter__(self): 245 self.open() 246 return self 247 248 def __exit__(self, exc_type, exc_val, exc_tb): 249 self.close() 250 251 def exists(self): 252 return os.path.exists(self.config_path) 253 254 def create(self): 255 assert not self.exists() 256 config = configparser.ConfigParser(interpolation=None) 257 config.add_section('cache') 258 config.set('cache', 'version', '1') 259 config.set('cache', 'repository', self.repository.id_str) 260 config.set('cache', 'manifest', '') 261 config.add_section('integrity') 262 config.set('integrity', 'manifest', '') 263 with SaveFile(self.config_path) as fd: 264 config.write(fd) 265 266 def open(self): 267 self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=self.lock_wait, 268 kill_stale_locks=hostname_is_unique()).acquire() 269 self.load() 270 271 def load(self): 272 self._config = configparser.ConfigParser(interpolation=None) 273 with open(self.config_path) as fd: 274 self._config.read_file(fd) 275 self._check_upgrade(self.config_path) 276 self.id = self._config.get('cache', 'repository') 277 self.manifest_id = unhexlify(self._config.get('cache', 'manifest')) 278 self.timestamp = self._config.get('cache', 'timestamp', fallback=None) 279 self.key_type = self._config.get('cache', 'key_type', fallback=None) 280 self.ignored_features = set(parse_stringified_list(self._config.get('cache', 'ignored_features', fallback=''))) 281 self.mandatory_features = set(parse_stringified_list(self._config.get('cache', 'mandatory_features', fallback=''))) 282 try: 283 self.integrity = dict(self._config.items('integrity')) 284 if self._config.get('cache', 'manifest') != self.integrity.pop('manifest'): 285 # The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser 286 # is modified and then written out.), not re-created. 287 # Thus, older versions will leave our [integrity] section alone, making the section's data invalid. 288 # Therefore, we also add the manifest ID to this section and 289 # can discern whether an older version interfered by comparing the manifest IDs of this section 290 # and the main [cache] section. 291 self.integrity = {} 292 logger.warning('Cache integrity data not available: old Borg version modified the cache.') 293 except configparser.NoSectionError: 294 logger.debug('Cache integrity: No integrity data found (files, chunks). Cache is from old version.') 295 self.integrity = {} 296 previous_location = self._config.get('cache', 'previous_location', fallback=None) 297 if previous_location: 298 self.previous_location = recanonicalize_relative_location(previous_location, self.repository) 299 else: 300 self.previous_location = None 301 self._config.set('cache', 'previous_location', self.repository._location.canonical_path()) 302 303 def save(self, manifest=None, key=None): 304 if manifest: 305 self._config.set('cache', 'manifest', manifest.id_str) 306 self._config.set('cache', 'timestamp', manifest.timestamp) 307 self._config.set('cache', 'ignored_features', ','.join(self.ignored_features)) 308 self._config.set('cache', 'mandatory_features', ','.join(self.mandatory_features)) 309 if not self._config.has_section('integrity'): 310 self._config.add_section('integrity') 311 for file, integrity_data in self.integrity.items(): 312 self._config.set('integrity', file, integrity_data) 313 self._config.set('integrity', 'manifest', manifest.id_str) 314 if key: 315 self._config.set('cache', 'key_type', str(key.TYPE)) 316 with SaveFile(self.config_path) as fd: 317 self._config.write(fd) 318 319 def close(self): 320 if self.lock is not None: 321 self.lock.release() 322 self.lock = None 323 324 def _check_upgrade(self, config_path): 325 try: 326 cache_version = self._config.getint('cache', 'version') 327 wanted_version = 1 328 if cache_version != wanted_version: 329 self.close() 330 raise Exception('%s has unexpected cache version %d (wanted: %d).' % 331 (config_path, cache_version, wanted_version)) 332 except configparser.NoSectionError: 333 self.close() 334 raise Exception('%s does not look like a Borg cache.' % config_path) from None 335 336 337class Cache: 338 """Client Side cache 339 """ 340 class RepositoryIDNotUnique(Error): 341 """Cache is newer than repository - do you have multiple, independently updated repos with same ID?""" 342 343 class RepositoryReplay(Error): 344 """Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)""" 345 346 class CacheInitAbortedError(Error): 347 """Cache initialization aborted""" 348 349 class RepositoryAccessAborted(Error): 350 """Repository access aborted""" 351 352 class EncryptionMethodMismatch(Error): 353 """Repository encryption method changed since last access, refusing to continue""" 354 355 @staticmethod 356 def break_lock(repository, path=None): 357 path = cache_dir(repository, path) 358 Lock(os.path.join(path, 'lock'), exclusive=True).break_lock() 359 360 @staticmethod 361 def destroy(repository, path=None): 362 """destroy the cache for ``repository`` or at ``path``""" 363 path = path or os.path.join(get_cache_dir(), repository.id_str) 364 config = os.path.join(path, 'config') 365 if os.path.exists(config): 366 os.remove(config) # kill config first 367 shutil.rmtree(path) 368 369 def __new__(cls, repository, key, manifest, path=None, sync=True, do_files=False, warn_if_unencrypted=True, 370 progress=False, lock_wait=None, permit_adhoc_cache=False, cache_mode=DEFAULT_FILES_CACHE_MODE, 371 ignore_inode=False): 372 373 if not do_files and 'd' not in cache_mode: 374 cache_mode = 'd' 375 elif ignore_inode and 'i' in cache_mode: 376 cache_mode = ''.join(set(cache_mode) - set('i')) 377 378 def local(): 379 return LocalCache(repository=repository, key=key, manifest=manifest, path=path, sync=sync, 380 warn_if_unencrypted=warn_if_unencrypted, progress=progress, 381 lock_wait=lock_wait, cache_mode=cache_mode) 382 383 def adhoc(): 384 return AdHocCache(repository=repository, key=key, manifest=manifest, lock_wait=lock_wait) 385 386 if not permit_adhoc_cache: 387 return local() 388 389 # ad-hoc cache may be permitted, but if the local cache is in sync it'd be stupid to invalidate 390 # it by needlessly using the ad-hoc cache. 391 # Check if the local cache exists and is in sync. 392 393 cache_config = CacheConfig(repository, path, lock_wait) 394 if cache_config.exists(): 395 with cache_config: 396 cache_in_sync = cache_config.manifest_id == manifest.id 397 # Don't nest cache locks 398 if cache_in_sync: 399 # Local cache is in sync, use it 400 logger.debug('Cache: choosing local cache (in sync)') 401 return local() 402 logger.debug('Cache: choosing ad-hoc cache (local cache does not exist or is not in sync)') 403 return adhoc() 404 405 406class CacheStatsMixin: 407 str_format = """\ 408All archives: {0.total_size:>20s} {0.total_csize:>20s} {0.unique_csize:>20s} 409 410 Unique chunks Total chunks 411Chunk index: {0.total_unique_chunks:20d} {0.total_chunks:20d}""" 412 413 def __str__(self): 414 return self.str_format.format(self.format_tuple()) 415 416 Summary = namedtuple('Summary', ['total_size', 'total_csize', 'unique_size', 'unique_csize', 'total_unique_chunks', 417 'total_chunks']) 418 419 def stats(self): 420 # XXX: this should really be moved down to `hashindex.pyx` 421 stats = self.Summary(*self.chunks.summarize())._asdict() 422 return stats 423 424 def format_tuple(self): 425 stats = self.stats() 426 for field in ['total_size', 'total_csize', 'unique_csize']: 427 stats[field] = format_file_size(stats[field]) 428 return self.Summary(**stats) 429 430 def chunks_stored_size(self): 431 return self.stats()['unique_csize'] 432 433 434class LocalCache(CacheStatsMixin): 435 """ 436 Persistent, local (client-side) cache. 437 """ 438 439 def __init__(self, repository, key, manifest, path=None, sync=True, warn_if_unencrypted=True, 440 progress=False, lock_wait=None, cache_mode=DEFAULT_FILES_CACHE_MODE): 441 """ 442 :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository 443 :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever]) 444 :param sync: do :meth:`.sync` 445 :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison 446 """ 447 self.repository = repository 448 self.key = key 449 self.manifest = manifest 450 self.progress = progress 451 self.cache_mode = cache_mode 452 self.timestamp = None 453 self.txn_active = False 454 455 self.path = cache_dir(repository, path) 456 self.security_manager = SecurityManager(repository) 457 self.cache_config = CacheConfig(self.repository, self.path, lock_wait) 458 459 # Warn user before sending data to a never seen before unencrypted repository 460 if not os.path.exists(self.path): 461 self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, key) 462 self.create() 463 464 self.open() 465 try: 466 self.security_manager.assert_secure(manifest, key, cache_config=self.cache_config) 467 468 if not self.check_cache_compatibility(): 469 self.wipe_cache() 470 471 self.update_compatibility() 472 473 if sync and self.manifest.id != self.cache_config.manifest_id: 474 self.sync() 475 self.commit() 476 except: 477 self.close() 478 raise 479 480 def __enter__(self): 481 return self 482 483 def __exit__(self, exc_type, exc_val, exc_tb): 484 self.close() 485 486 def create(self): 487 """Create a new empty cache at `self.path` 488 """ 489 os.makedirs(self.path) 490 with open(os.path.join(self.path, 'README'), 'w') as fd: 491 fd.write(CACHE_README) 492 self.cache_config.create() 493 ChunkIndex().write(os.path.join(self.path, 'chunks')) 494 os.makedirs(os.path.join(self.path, 'chunks.archive.d')) 495 with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): 496 pass # empty file 497 498 def _do_open(self): 499 self.cache_config.load() 500 with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=False, 501 integrity_data=self.cache_config.integrity.get('chunks')) as fd: 502 self.chunks = ChunkIndex.read(fd) 503 if 'd' in self.cache_mode: # d(isabled) 504 self.files = None 505 else: 506 self._read_files() 507 508 def open(self): 509 if not os.path.isdir(self.path): 510 raise Exception('%s Does not look like a Borg cache' % self.path) 511 self.cache_config.open() 512 self.rollback() 513 514 def close(self): 515 if self.cache_config is not None: 516 self.cache_config.close() 517 self.cache_config = None 518 519 def _read_files(self): 520 self.files = {} 521 self._newest_cmtime = None 522 logger.debug('Reading files cache ...') 523 files_cache_logger.debug("FILES-CACHE-LOAD: starting...") 524 msg = None 525 try: 526 with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=False, 527 integrity_data=self.cache_config.integrity.get(files_cache_name())) as fd: 528 u = msgpack.Unpacker(use_list=True) 529 while True: 530 data = fd.read(64 * 1024) 531 if not data: 532 break 533 u.feed(data) 534 try: 535 for path_hash, item in u: 536 entry = FileCacheEntry(*item) 537 # in the end, this takes about 240 Bytes per file 538 self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1)) 539 except (TypeError, ValueError) as exc: 540 msg = "The files cache seems invalid. [%s]" % str(exc) 541 break 542 except OSError as exc: 543 msg = "The files cache can't be read. [%s]" % str(exc) 544 except FileIntegrityError as fie: 545 msg = "The files cache is corrupted. [%s]" % str(fie) 546 if msg is not None: 547 logger.warning(msg) 548 logger.warning('Continuing without files cache - expect lower performance.') 549 self.files = {} 550 files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files)) 551 552 def begin_txn(self): 553 # Initialize transaction snapshot 554 pi = ProgressIndicatorMessage(msgid='cache.begin_transaction') 555 txn_dir = os.path.join(self.path, 'txn.tmp') 556 os.mkdir(txn_dir) 557 pi.output('Initializing cache transaction: Reading config') 558 shutil.copy(os.path.join(self.path, 'config'), txn_dir) 559 pi.output('Initializing cache transaction: Reading chunks') 560 shutil.copy(os.path.join(self.path, 'chunks'), txn_dir) 561 pi.output('Initializing cache transaction: Reading files') 562 try: 563 shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir) 564 except FileNotFoundError: 565 with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True): 566 pass # empty file 567 os.rename(os.path.join(self.path, 'txn.tmp'), 568 os.path.join(self.path, 'txn.active')) 569 self.txn_active = True 570 pi.finish() 571 572 def commit(self): 573 """Commit transaction 574 """ 575 if not self.txn_active: 576 return 577 self.security_manager.save(self.manifest, self.key) 578 pi = ProgressIndicatorMessage(msgid='cache.commit') 579 if self.files is not None: 580 if self._newest_cmtime is None: 581 # was never set because no files were modified/added 582 self._newest_cmtime = 2 ** 63 - 1 # nanoseconds, good until y2262 583 ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20)) 584 pi.output('Saving files cache') 585 files_cache_logger.debug("FILES-CACHE-SAVE: starting...") 586 with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd: 587 entry_count = 0 588 for path_hash, item in self.files.items(): 589 # Only keep files seen in this backup that are older than newest cmtime seen in this backup - 590 # this is to avoid issues with filesystem snapshots and cmtime granularity. 591 # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet. 592 entry = FileCacheEntry(*msgpack.unpackb(item)) 593 if entry.age == 0 and bigint_to_int(entry.cmtime) < self._newest_cmtime or \ 594 entry.age > 0 and entry.age < ttl: 595 msgpack.pack((path_hash, entry), fd) 596 entry_count += 1 597 files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl) 598 files_cache_logger.debug("FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime) 599 files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count) 600 self.cache_config.integrity[files_cache_name()] = fd.integrity_data 601 pi.output('Saving chunks cache') 602 with IntegrityCheckedFile(path=os.path.join(self.path, 'chunks'), write=True) as fd: 603 self.chunks.write(fd) 604 self.cache_config.integrity['chunks'] = fd.integrity_data 605 pi.output('Saving cache config') 606 self.cache_config.save(self.manifest, self.key) 607 os.rename(os.path.join(self.path, 'txn.active'), 608 os.path.join(self.path, 'txn.tmp')) 609 shutil.rmtree(os.path.join(self.path, 'txn.tmp')) 610 self.txn_active = False 611 pi.finish() 612 613 def rollback(self): 614 """Roll back partial and aborted transactions 615 """ 616 # Remove partial transaction 617 if os.path.exists(os.path.join(self.path, 'txn.tmp')): 618 shutil.rmtree(os.path.join(self.path, 'txn.tmp')) 619 # Roll back active transaction 620 txn_dir = os.path.join(self.path, 'txn.active') 621 if os.path.exists(txn_dir): 622 shutil.copy(os.path.join(txn_dir, 'config'), self.path) 623 shutil.copy(os.path.join(txn_dir, 'chunks'), self.path) 624 shutil.copy(os.path.join(txn_dir, files_cache_name()), self.path) 625 os.rename(txn_dir, os.path.join(self.path, 'txn.tmp')) 626 if os.path.exists(os.path.join(self.path, 'txn.tmp')): 627 shutil.rmtree(os.path.join(self.path, 'txn.tmp')) 628 self.txn_active = False 629 self._do_open() 630 631 def sync(self): 632 """Re-synchronize chunks cache with repository. 633 634 Maintains a directory with known backup archive indexes, so it only 635 needs to fetch infos from repo and build a chunk index once per backup 636 archive. 637 If out of sync, missing archive indexes get added, outdated indexes 638 get removed and a new master chunks index is built by merging all 639 archive indexes. 640 """ 641 archive_path = os.path.join(self.path, 'chunks.archive.d') 642 # An index of chunks whose size had to be fetched 643 chunks_fetched_size_index = ChunkIndex() 644 # Instrumentation 645 processed_item_metadata_bytes = 0 646 processed_item_metadata_chunks = 0 647 compact_chunks_archive_saved_space = 0 648 fetched_chunks_for_csize = 0 649 fetched_bytes_for_csize = 0 650 651 def mkpath(id, suffix=''): 652 id_hex = bin_to_hex(id) 653 path = os.path.join(archive_path, id_hex + suffix) 654 return path 655 656 def cached_archives(): 657 if self.do_cache: 658 fns = os.listdir(archive_path) 659 # filenames with 64 hex digits == 256bit, 660 # or compact indices which are 64 hex digits + ".compact" 661 return set(unhexlify(fn) for fn in fns if len(fn) == 64) | \ 662 set(unhexlify(fn[:64]) for fn in fns if len(fn) == 72 and fn.endswith('.compact')) 663 else: 664 return set() 665 666 def repo_archives(): 667 return set(info.id for info in self.manifest.archives.list()) 668 669 def cleanup_outdated(ids): 670 for id in ids: 671 cleanup_cached_archive(id) 672 673 def cleanup_cached_archive(id, cleanup_compact=True): 674 try: 675 os.unlink(mkpath(id)) 676 os.unlink(mkpath(id) + '.integrity') 677 except FileNotFoundError: 678 pass 679 if not cleanup_compact: 680 return 681 try: 682 os.unlink(mkpath(id, suffix='.compact')) 683 os.unlink(mkpath(id, suffix='.compact') + '.integrity') 684 except FileNotFoundError: 685 pass 686 687 def fetch_missing_csize(chunk_idx): 688 """ 689 Archives created with AdHocCache will have csize=0 in all chunk list entries whose 690 chunks were already in the repository. 691 692 Scan *chunk_idx* for entries where csize=0 and fill in the correct information. 693 """ 694 nonlocal fetched_chunks_for_csize 695 nonlocal fetched_bytes_for_csize 696 697 all_missing_ids = chunk_idx.zero_csize_ids() 698 fetch_ids = [] 699 if len(chunks_fetched_size_index): 700 for id_ in all_missing_ids: 701 already_fetched_entry = chunks_fetched_size_index.get(id_) 702 if already_fetched_entry: 703 entry = chunk_idx[id_]._replace(csize=already_fetched_entry.csize) 704 assert entry.size == already_fetched_entry.size, 'Chunk size mismatch' 705 chunk_idx[id_] = entry 706 else: 707 fetch_ids.append(id_) 708 else: 709 fetch_ids = all_missing_ids 710 711 # This is potentially a rather expensive operation, but it's hard to tell at this point 712 # if it's a problem in practice (hence the experimental status of --no-cache-sync). 713 for id_, data in zip(fetch_ids, decrypted_repository.repository.get_many(fetch_ids)): 714 entry = chunk_idx[id_]._replace(csize=len(data)) 715 chunk_idx[id_] = entry 716 chunks_fetched_size_index[id_] = entry 717 fetched_chunks_for_csize += 1 718 fetched_bytes_for_csize += len(data) 719 720 def fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx): 721 nonlocal processed_item_metadata_bytes 722 nonlocal processed_item_metadata_chunks 723 csize, data = decrypted_repository.get(archive_id) 724 chunk_idx.add(archive_id, 1, len(data), csize) 725 archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) 726 if archive.version != 1: 727 raise Exception('Unknown archive metadata version') 728 sync = CacheSynchronizer(chunk_idx) 729 for item_id, (csize, data) in zip(archive.items, decrypted_repository.get_many(archive.items)): 730 chunk_idx.add(item_id, 1, len(data), csize) 731 processed_item_metadata_bytes += len(data) 732 processed_item_metadata_chunks += 1 733 sync.feed(data) 734 if self.do_cache: 735 fetch_missing_csize(chunk_idx) 736 write_archive_index(archive_id, chunk_idx) 737 738 def write_archive_index(archive_id, chunk_idx): 739 nonlocal compact_chunks_archive_saved_space 740 compact_chunks_archive_saved_space += chunk_idx.compact() 741 fn = mkpath(archive_id, suffix='.compact') 742 fn_tmp = mkpath(archive_id, suffix='.tmp') 743 try: 744 with DetachedIntegrityCheckedFile(path=fn_tmp, write=True, 745 filename=bin_to_hex(archive_id) + '.compact') as fd: 746 chunk_idx.write(fd) 747 except Exception: 748 truncate_and_unlink(fn_tmp) 749 else: 750 os.rename(fn_tmp, fn) 751 752 def read_archive_index(archive_id, archive_name): 753 archive_chunk_idx_path = mkpath(archive_id) 754 logger.info("Reading cached archive chunk index for %s ...", archive_name) 755 try: 756 try: 757 # Attempt to load compact index first 758 with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path + '.compact', write=False) as fd: 759 archive_chunk_idx = ChunkIndex.read(fd, permit_compact=True) 760 # In case a non-compact index exists, delete it. 761 cleanup_cached_archive(archive_id, cleanup_compact=False) 762 # Compact index read - return index, no conversion necessary (below). 763 return archive_chunk_idx 764 except FileNotFoundError: 765 # No compact index found, load non-compact index, and convert below. 766 with DetachedIntegrityCheckedFile(path=archive_chunk_idx_path, write=False) as fd: 767 archive_chunk_idx = ChunkIndex.read(fd) 768 except FileIntegrityError as fie: 769 logger.error('Cached archive chunk index of %s is corrupted: %s', archive_name, fie) 770 # Delete corrupted index, set warning. A new index must be build. 771 cleanup_cached_archive(archive_id) 772 set_ec(EXIT_WARNING) 773 return None 774 775 # Convert to compact index. Delete the existing index first. 776 logger.debug('Found non-compact index for %s, converting to compact.', archive_name) 777 cleanup_cached_archive(archive_id) 778 write_archive_index(archive_id, archive_chunk_idx) 779 return archive_chunk_idx 780 781 def get_archive_ids_to_names(archive_ids): 782 # Pass once over all archives and build a mapping from ids to names. 783 # The easier approach, doing a similar loop for each archive, has 784 # square complexity and does about a dozen million functions calls 785 # with 1100 archives (which takes 30s CPU seconds _alone_). 786 archive_names = {} 787 for info in self.manifest.archives.list(): 788 if info.id in archive_ids: 789 archive_names[info.id] = info.name 790 assert len(archive_names) == len(archive_ids) 791 return archive_names 792 793 def create_master_idx(chunk_idx): 794 logger.info('Synchronizing chunks cache...') 795 cached_ids = cached_archives() 796 archive_ids = repo_archives() 797 logger.info('Archives: %d, w/ cached Idx: %d, w/ outdated Idx: %d, w/o cached Idx: %d.', 798 len(archive_ids), len(cached_ids), 799 len(cached_ids - archive_ids), len(archive_ids - cached_ids)) 800 # deallocates old hashindex, creates empty hashindex: 801 chunk_idx.clear() 802 cleanup_outdated(cached_ids - archive_ids) 803 # Explicitly set the initial hash table capacity to avoid performance issues 804 # due to hash table "resonance". 805 master_index_capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR) 806 if archive_ids: 807 chunk_idx = None if not self.do_cache else ChunkIndex(master_index_capacity) 808 pi = ProgressIndicatorPercent(total=len(archive_ids), step=0.1, 809 msg='%3.0f%% Syncing chunks cache. Processing archive %s', 810 msgid='cache.sync') 811 archive_ids_to_names = get_archive_ids_to_names(archive_ids) 812 for archive_id, archive_name in archive_ids_to_names.items(): 813 pi.show(info=[remove_surrogates(archive_name)]) 814 if self.do_cache: 815 if archive_id in cached_ids: 816 archive_chunk_idx = read_archive_index(archive_id, archive_name) 817 if archive_chunk_idx is None: 818 cached_ids.remove(archive_id) 819 if archive_id not in cached_ids: 820 # Do not make this an else branch; the FileIntegrityError exception handler 821 # above can remove *archive_id* from *cached_ids*. 822 logger.info('Fetching and building archive index for %s ...', archive_name) 823 archive_chunk_idx = ChunkIndex() 824 fetch_and_build_idx(archive_id, decrypted_repository, archive_chunk_idx) 825 logger.info("Merging into master chunks index ...") 826 chunk_idx.merge(archive_chunk_idx) 827 else: 828 chunk_idx = chunk_idx or ChunkIndex(master_index_capacity) 829 logger.info('Fetching archive index for %s ...', archive_name) 830 fetch_and_build_idx(archive_id, decrypted_repository, chunk_idx) 831 if not self.do_cache: 832 fetch_missing_csize(chunk_idx) 833 pi.finish() 834 logger.debug('Cache sync: had to fetch %s (%d chunks) because no archive had a csize set for them ' 835 '(due to --no-cache-sync)', 836 format_file_size(fetched_bytes_for_csize), fetched_chunks_for_csize) 837 logger.debug('Cache sync: processed %s (%d chunks) of metadata', 838 format_file_size(processed_item_metadata_bytes), processed_item_metadata_chunks) 839 logger.debug('Cache sync: compact chunks.archive.d storage saved %s bytes', 840 format_file_size(compact_chunks_archive_saved_space)) 841 logger.info('Done.') 842 return chunk_idx 843 844 def legacy_cleanup(): 845 """bring old cache dirs into the desired state (cleanup and adapt)""" 846 try: 847 os.unlink(os.path.join(self.path, 'chunks.archive')) 848 except: 849 pass 850 try: 851 os.unlink(os.path.join(self.path, 'chunks.archive.tmp')) 852 except: 853 pass 854 try: 855 os.mkdir(archive_path) 856 except: 857 pass 858 859 # The cache can be used by a command that e.g. only checks against Manifest.Operation.WRITE, 860 # which does not have to include all flags from Manifest.Operation.READ. 861 # Since the sync will attempt to read archives, check compatibility with Manifest.Operation.READ. 862 self.manifest.check_repository_compatibility((Manifest.Operation.READ, )) 863 864 self.begin_txn() 865 with cache_if_remote(self.repository, decrypted_cache=self.key) as decrypted_repository: 866 legacy_cleanup() 867 # TEMPORARY HACK: to avoid archive index caching, create a FILE named ~/.cache/borg/REPOID/chunks.archive.d - 868 # this is only recommended if you have a fast, low latency connection to your repo (e.g. if repo is local disk) 869 self.do_cache = os.path.isdir(archive_path) 870 self.chunks = create_master_idx(self.chunks) 871 872 def check_cache_compatibility(self): 873 my_features = Manifest.SUPPORTED_REPO_FEATURES 874 if self.cache_config.ignored_features & my_features: 875 # The cache might not contain references of chunks that need a feature that is mandatory for some operation 876 # and which this version supports. To avoid corruption while executing that operation force rebuild. 877 return False 878 if not self.cache_config.mandatory_features <= my_features: 879 # The cache was build with consideration to at least one feature that this version does not understand. 880 # This client might misinterpret the cache. Thus force a rebuild. 881 return False 882 return True 883 884 def wipe_cache(self): 885 logger.warning("Discarding incompatible cache and forcing a cache rebuild") 886 archive_path = os.path.join(self.path, 'chunks.archive.d') 887 if os.path.isdir(archive_path): 888 shutil.rmtree(os.path.join(self.path, 'chunks.archive.d')) 889 os.makedirs(os.path.join(self.path, 'chunks.archive.d')) 890 self.chunks = ChunkIndex() 891 with SaveFile(os.path.join(self.path, files_cache_name()), binary=True): 892 pass # empty file 893 self.cache_config.manifest_id = '' 894 self.cache_config._config.set('cache', 'manifest', '') 895 896 self.cache_config.ignored_features = set() 897 self.cache_config.mandatory_features = set() 898 899 def update_compatibility(self): 900 operation_to_features_map = self.manifest.get_all_mandatory_features() 901 my_features = Manifest.SUPPORTED_REPO_FEATURES 902 repo_features = set() 903 for operation, features in operation_to_features_map.items(): 904 repo_features.update(features) 905 906 self.cache_config.ignored_features.update(repo_features - my_features) 907 self.cache_config.mandatory_features.update(repo_features & my_features) 908 909 def add_chunk(self, id, chunk, stats, overwrite=False, wait=True): 910 if not self.txn_active: 911 self.begin_txn() 912 size = len(chunk) 913 refcount = self.seen_chunk(id, size) 914 if refcount and not overwrite: 915 return self.chunk_incref(id, stats) 916 data = self.key.encrypt(chunk) 917 csize = len(data) 918 self.repository.put(id, data, wait=wait) 919 self.chunks.add(id, 1, size, csize) 920 stats.update(size, csize, not refcount) 921 return ChunkListEntry(id, size, csize) 922 923 def seen_chunk(self, id, size=None): 924 refcount, stored_size, _ = self.chunks.get(id, ChunkIndexEntry(0, None, None)) 925 if size is not None and stored_size is not None and size != stored_size: 926 # we already have a chunk with that id, but different size. 927 # this is either a hash collision (unlikely) or corruption or a bug. 928 raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % ( 929 id, stored_size, size)) 930 return refcount 931 932 def chunk_incref(self, id, stats, size=None): 933 if not self.txn_active: 934 self.begin_txn() 935 count, _size, csize = self.chunks.incref(id) 936 stats.update(_size, csize, False) 937 return ChunkListEntry(id, _size, csize) 938 939 def chunk_decref(self, id, stats, wait=True): 940 if not self.txn_active: 941 self.begin_txn() 942 count, size, csize = self.chunks.decref(id) 943 if count == 0: 944 del self.chunks[id] 945 self.repository.delete(id, wait=wait) 946 stats.update(-size, -csize, True) 947 else: 948 stats.update(-size, -csize, False) 949 950 def file_known_and_unchanged(self, hashed_path, path_hash, st): 951 """ 952 Check if we know the file that has this path_hash (know == it is in our files cache) and 953 whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode). 954 955 :param hashed_path: the file's path as we gave it to hash(hashed_path) 956 :param path_hash: hash(hashed_path), to save some memory in the files cache 957 :param st: the file's stat() result 958 :return: known, ids (known is True if we have infos about this file in the cache, 959 ids is the list of chunk ids IF the file has not changed, otherwise None). 960 """ 961 if not stat.S_ISREG(st.st_mode): 962 return False, None 963 cache_mode = self.cache_mode 964 if 'd' in cache_mode: # d(isabled) 965 files_cache_logger.debug('UNKNOWN: files cache disabled') 966 return False, None 967 # note: r(echunk) does not need the files cache in this method, but the files cache will 968 # be updated and saved to disk to memorize the files. To preserve previous generations in 969 # the cache, this means that it also needs to get loaded from disk first. 970 if 'r' in cache_mode: # r(echunk) 971 files_cache_logger.debug('UNKNOWN: rechunking enforced') 972 return False, None 973 entry = self.files.get(path_hash) 974 if not entry: 975 files_cache_logger.debug('UNKNOWN: no file metadata in cache for: %r', hashed_path) 976 return False, None 977 # we know the file! 978 entry = FileCacheEntry(*msgpack.unpackb(entry)) 979 if 's' in cache_mode and entry.size != st.st_size: 980 files_cache_logger.debug('KNOWN-CHANGED: file size has changed: %r', hashed_path) 981 return True, None 982 if 'i' in cache_mode and entry.inode != st.st_ino: 983 files_cache_logger.debug('KNOWN-CHANGED: file inode number has changed: %r', hashed_path) 984 return True, None 985 if 'c' in cache_mode and bigint_to_int(entry.cmtime) != st.st_ctime_ns: 986 files_cache_logger.debug('KNOWN-CHANGED: file ctime has changed: %r', hashed_path) 987 return True, None 988 elif 'm' in cache_mode and bigint_to_int(entry.cmtime) != st.st_mtime_ns: 989 files_cache_logger.debug('KNOWN-CHANGED: file mtime has changed: %r', hashed_path) 990 return True, None 991 # we ignored the inode number in the comparison above or it is still same. 992 # if it is still the same, replacing it in the tuple doesn't change it. 993 # if we ignored it, a reason for doing that is that files were moved to a new 994 # disk / new fs (so a one-time change of inode number is expected) and we wanted 995 # to avoid everything getting chunked again. to be able to re-enable the inode 996 # number comparison in a future backup run (and avoid chunking everything 997 # again at that time), we need to update the inode number in the cache with what 998 # we see in the filesystem. 999 self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0)) 1000 return True, entry.chunk_ids 1001 1002 def memorize_file(self, hashed_path, path_hash, st, ids): 1003 if not stat.S_ISREG(st.st_mode): 1004 return 1005 cache_mode = self.cache_mode 1006 # note: r(echunk) modes will update the files cache, d(isabled) mode won't 1007 if 'd' in cache_mode: 1008 files_cache_logger.debug('FILES-CACHE-NOUPDATE: files cache disabled') 1009 return 1010 if 'c' in cache_mode: 1011 cmtime_type = 'ctime' 1012 cmtime_ns = safe_ns(st.st_ctime_ns) 1013 elif 'm' in cache_mode: 1014 cmtime_type = 'mtime' 1015 cmtime_ns = safe_ns(st.st_mtime_ns) 1016 entry = FileCacheEntry(age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_bigint(cmtime_ns), chunk_ids=ids) 1017 self.files[path_hash] = msgpack.packb(entry) 1018 self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns) 1019 files_cache_logger.debug('FILES-CACHE-UPDATE: put %r [has %s] <- %r', 1020 entry._replace(chunk_ids='[%d entries]' % len(entry.chunk_ids)), 1021 cmtime_type, hashed_path) 1022 1023 1024class AdHocCache(CacheStatsMixin): 1025 """ 1026 Ad-hoc, non-persistent cache. 1027 1028 Compared to the standard LocalCache the AdHocCache does not maintain accurate reference count, 1029 nor does it provide a files cache (which would require persistence). Chunks that were not added 1030 during the current AdHocCache lifetime won't have correct size/csize set (0 bytes) and will 1031 have an infinite reference count (MAX_VALUE). 1032 """ 1033 1034 str_format = """\ 1035All archives: unknown unknown unknown 1036 1037 Unique chunks Total chunks 1038Chunk index: {0.total_unique_chunks:20d} unknown""" 1039 1040 def __init__(self, repository, key, manifest, warn_if_unencrypted=True, lock_wait=None): 1041 self.repository = repository 1042 self.key = key 1043 self.manifest = manifest 1044 self._txn_active = False 1045 1046 self.security_manager = SecurityManager(repository) 1047 self.security_manager.assert_secure(manifest, key, lock_wait=lock_wait) 1048 1049 logger.warning('Note: --no-cache-sync is an experimental feature.') 1050 1051 # Public API 1052 1053 def __enter__(self): 1054 return self 1055 1056 def __exit__(self, exc_type, exc_val, exc_tb): 1057 pass 1058 1059 files = None 1060 cache_mode = 'd' 1061 1062 def file_known_and_unchanged(self, hashed_path, path_hash, st): 1063 files_cache_logger.debug("UNKNOWN: files cache not implemented") 1064 return False, None 1065 1066 def memorize_file(self, hashed_path, path_hash, st, ids): 1067 pass 1068 1069 def add_chunk(self, id, chunk, stats, overwrite=False, wait=True): 1070 assert not overwrite, 'AdHocCache does not permit overwrites — trying to use it for recreate?' 1071 if not self._txn_active: 1072 self._begin_txn() 1073 size = len(chunk) 1074 refcount = self.seen_chunk(id, size) 1075 if refcount: 1076 return self.chunk_incref(id, stats, size=size) 1077 data = self.key.encrypt(chunk) 1078 csize = len(data) 1079 self.repository.put(id, data, wait=wait) 1080 self.chunks.add(id, 1, size, csize) 1081 stats.update(size, csize, not refcount) 1082 return ChunkListEntry(id, size, csize) 1083 1084 def seen_chunk(self, id, size=None): 1085 if not self._txn_active: 1086 self._begin_txn() 1087 entry = self.chunks.get(id, ChunkIndexEntry(0, None, None)) 1088 if entry.refcount and size and not entry.size: 1089 # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions. 1090 # This is of course not possible for the AdHocCache. 1091 # Here *size* is used to update the chunk's size information, which will be zero for existing chunks. 1092 self.chunks[id] = entry._replace(size=size) 1093 return entry.refcount 1094 1095 def chunk_incref(self, id, stats, size=None): 1096 if not self._txn_active: 1097 self._begin_txn() 1098 count, _size, csize = self.chunks.incref(id) 1099 # When _size is 0 and size is not given, then this chunk has not been locally visited yet (seen_chunk with 1100 # size or add_chunk); we can't add references to those (size=0 is invalid) and generally don't try to. 1101 size = _size or size 1102 assert size 1103 stats.update(size, csize, False) 1104 return ChunkListEntry(id, size, csize) 1105 1106 def chunk_decref(self, id, stats, wait=True): 1107 if not self._txn_active: 1108 self._begin_txn() 1109 count, size, csize = self.chunks.decref(id) 1110 if count == 0: 1111 del self.chunks[id] 1112 self.repository.delete(id, wait=wait) 1113 stats.update(-size, -csize, True) 1114 else: 1115 stats.update(-size, -csize, False) 1116 1117 def commit(self): 1118 if not self._txn_active: 1119 return 1120 self.security_manager.save(self.manifest, self.key) 1121 self._txn_active = False 1122 1123 def rollback(self): 1124 self._txn_active = False 1125 del self.chunks 1126 1127 # Private API 1128 1129 def _begin_txn(self): 1130 self._txn_active = True 1131 # Explicitly set the initial hash table capacity to avoid performance issues 1132 # due to hash table "resonance". 1133 # Since we're creating an archive, add 10 % from the start. 1134 num_chunks = len(self.repository) 1135 capacity = int(num_chunks / ChunkIndex.MAX_LOAD_FACTOR * 1.1) 1136 self.chunks = ChunkIndex(capacity) 1137 pi = ProgressIndicatorPercent(total=num_chunks, msg='Downloading chunk list... %3.0f%%', 1138 msgid='cache.download_chunks') 1139 t0 = perf_counter() 1140 num_requests = 0 1141 marker = None 1142 while True: 1143 result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) 1144 num_requests += 1 1145 if not result: 1146 break 1147 pi.show(increase=len(result)) 1148 marker = result[-1] 1149 # All chunks from the repository have a refcount of MAX_VALUE, which is sticky, 1150 # therefore we can't/won't delete them. Chunks we added ourselves in this transaction 1151 # (e.g. checkpoint archives) are tracked correctly. 1152 init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0, csize=0) 1153 for id_ in result: 1154 self.chunks[id_] = init_entry 1155 assert len(self.chunks) == num_chunks 1156 # LocalCache does not contain the manifest, either. 1157 del self.chunks[self.manifest.MANIFEST_ID] 1158 duration = perf_counter() - t0 or 0.01 1159 pi.finish() 1160 logger.debug('AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s', 1161 num_chunks, duration, num_requests, format_file_size(num_chunks * 34 / duration)) 1162 # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes. 1163 # Protocol overhead is neglected in this calculation. 1164