1import errno 2import json 3import os 4import socket 5import stat 6import sys 7import time 8from contextlib import contextmanager 9from datetime import datetime, timezone, timedelta 10from functools import partial 11from getpass import getuser 12from io import BytesIO 13from itertools import groupby 14from shutil import get_terminal_size 15 16from .logger import create_logger 17 18logger = create_logger() 19 20from . import xattr 21from .chunker import Chunker 22from .cache import ChunkListEntry 23from .crypto.key import key_factory 24from .compress import Compressor, CompressionSpec 25from .constants import * # NOQA 26from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer 27from .helpers import Manifest 28from .helpers import hardlinkable 29from .helpers import ChunkIteratorFileWrapper, open_item 30from .helpers import Error, IntegrityError, set_ec 31from .helpers import uid2user, user2uid, gid2group, group2gid 32from .helpers import parse_timestamp, to_localtime 33from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize 34from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates 35from .helpers import StableDict 36from .helpers import bin_to_hex 37from .helpers import safe_ns 38from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi 39from .helpers import msgpack 40from .patterns import PathPrefixPattern, FnmatchPattern, IECommand 41from .item import Item, ArchiveItem 42from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname 43from .remote import cache_if_remote 44from .repository import Repository, LIST_SCAN_LIMIT 45 46has_lchmod = hasattr(os, 'lchmod') 47has_link = hasattr(os, 'link') 48 49flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0) 50flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0) 51 52 53class Statistics: 54 55 def __init__(self, output_json=False): 56 self.output_json = output_json 57 self.osize = self.csize = self.usize = self.nfiles = 0 58 self.last_progress = 0 # timestamp when last progress was shown 59 60 def update(self, size, csize, unique): 61 self.osize += size 62 self.csize += csize 63 if unique: 64 self.usize += csize 65 66 summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}" 67 68 def __str__(self): 69 return self.summary.format(stats=self, label='This archive:') 70 71 def __repr__(self): 72 return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format( 73 cls=type(self).__name__, hash=id(self), self=self) 74 75 def as_dict(self): 76 return { 77 'original_size': FileSize(self.osize), 78 'compressed_size': FileSize(self.csize), 79 'deduplicated_size': FileSize(self.usize), 80 'nfiles': self.nfiles, 81 } 82 83 @property 84 def osize_fmt(self): 85 return format_file_size(self.osize) 86 87 @property 88 def usize_fmt(self): 89 return format_file_size(self.usize) 90 91 @property 92 def csize_fmt(self): 93 return format_file_size(self.csize) 94 95 def show_progress(self, item=None, final=False, stream=None, dt=None): 96 now = time.monotonic() 97 if dt is None or now - self.last_progress > dt: 98 self.last_progress = now 99 if self.output_json: 100 data = self.as_dict() 101 data.update({ 102 'time': time.time(), 103 'type': 'archive_progress', 104 'path': remove_surrogates(item.path if item else ''), 105 }) 106 msg = json.dumps(data) 107 end = '\n' 108 else: 109 columns, lines = get_terminal_size() 110 if not final: 111 msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self) 112 path = remove_surrogates(item.path) if item else '' 113 space = columns - swidth(msg) 114 if space < 12: 115 msg = '' 116 space = columns - swidth(msg) 117 if space >= 8: 118 msg += ellipsis_truncate(path, space) 119 else: 120 msg = ' ' * columns 121 end = '\r' 122 print(msg, end=end, file=stream or sys.stderr, flush=True) 123 124 125def is_special(mode): 126 # file types that get special treatment in --read-special mode 127 return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode) 128 129 130class BackupError(Exception): 131 """ 132 Exception raised for non-OSError-based exceptions while accessing backup files. 133 """ 134 135 136class BackupOSError(Exception): 137 """ 138 Wrapper for OSError raised while accessing backup files. 139 140 Borg does different kinds of IO, and IO failures have different consequences. 141 This wrapper represents failures of input file or extraction IO. 142 These are non-critical and are only reported (exit code = 1, warning). 143 144 Any unwrapped IO error is critical and aborts execution (for example repository IO failure). 145 """ 146 def __init__(self, op, os_error): 147 self.op = op 148 self.os_error = os_error 149 self.errno = os_error.errno 150 self.strerror = os_error.strerror 151 self.filename = os_error.filename 152 153 def __str__(self): 154 if self.op: 155 return '%s: %s' % (self.op, self.os_error) 156 else: 157 return str(self.os_error) 158 159 160class BackupIO: 161 op = '' 162 163 def __call__(self, op=''): 164 self.op = op 165 return self 166 167 def __enter__(self): 168 pass 169 170 def __exit__(self, exc_type, exc_val, exc_tb): 171 if exc_type and issubclass(exc_type, OSError): 172 raise BackupOSError(self.op, exc_val) from exc_val 173 174 175backup_io = BackupIO() 176 177 178def backup_io_iter(iterator): 179 backup_io.op = 'read' 180 while True: 181 with backup_io: 182 try: 183 item = next(iterator) 184 except StopIteration: 185 return 186 yield item 187 188 189class DownloadPipeline: 190 191 def __init__(self, repository, key): 192 self.repository = repository 193 self.key = key 194 195 def unpack_many(self, ids, filter=None, partial_extract=False, preload=False, hardlink_masters=None): 196 """ 197 Return iterator of items. 198 199 *ids* is a chunk ID list of an item stream. *filter* is a callable 200 to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item. 201 202 Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved, 203 otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak. 204 """ 205 def _preload(chunks): 206 self.repository.preload([c.id for c in chunks]) 207 208 masters_preloaded = set() 209 unpacker = msgpack.Unpacker(use_list=False) 210 for data in self.fetch_many(ids): 211 unpacker.feed(data) 212 items = [Item(internal_dict=item) for item in unpacker] 213 for item in items: 214 if 'chunks' in item: 215 item.chunks = [ChunkListEntry(*e) for e in item.chunks] 216 217 if filter: 218 items = [item for item in items if filter(item)] 219 220 if preload: 221 if filter and partial_extract: 222 # if we do only a partial extraction, it gets a bit 223 # complicated with computing the preload items: if a hardlink master item is not 224 # selected (== not extracted), we will still need to preload its chunks if a 225 # corresponding hardlink slave is selected (== is extracted). 226 # due to a side effect of the filter() call, we now have hardlink_masters dict populated. 227 for item in items: 228 if 'chunks' in item: # regular file, maybe a hardlink master 229 _preload(item.chunks) 230 # if this is a hardlink master, remember that we already preloaded it: 231 if 'source' not in item and hardlinkable(item.mode) and item.get('hardlink_master', True): 232 masters_preloaded.add(item.path) 233 elif 'source' in item and hardlinkable(item.mode): # hardlink slave 234 source = item.source 235 if source not in masters_preloaded: 236 # we only need to preload *once* (for the 1st selected slave) 237 chunks, _ = hardlink_masters[source] 238 if chunks is not None: 239 _preload(chunks) 240 masters_preloaded.add(source) 241 else: 242 # easy: we do not have a filter, thus all items are selected, thus we need to preload all chunks. 243 for item in items: 244 if 'chunks' in item: 245 _preload(item.chunks) 246 247 for item in items: 248 yield item 249 250 def fetch_many(self, ids, is_preloaded=False): 251 for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): 252 yield self.key.decrypt(id_, data) 253 254 255class ChunkBuffer: 256 BUFFER_SIZE = 8 * 1024 * 1024 257 258 def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS): 259 self.buffer = BytesIO() 260 self.packer = msgpack.Packer(unicode_errors='surrogateescape') 261 self.chunks = [] 262 self.key = key 263 self.chunker = Chunker(self.key.chunk_seed, *chunker_params) 264 265 def add(self, item): 266 self.buffer.write(self.packer.pack(item.as_dict())) 267 if self.is_full(): 268 self.flush() 269 270 def write_chunk(self, chunk): 271 raise NotImplementedError 272 273 def flush(self, flush=False): 274 if self.buffer.tell() == 0: 275 return 276 self.buffer.seek(0) 277 # The chunker returns a memoryview to its internal buffer, 278 # thus a copy is needed before resuming the chunker iterator. 279 chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer)) 280 self.buffer.seek(0) 281 self.buffer.truncate(0) 282 # Leave the last partial chunk in the buffer unless flush is True 283 end = None if flush or len(chunks) == 1 else -1 284 for chunk in chunks[:end]: 285 self.chunks.append(self.write_chunk(chunk)) 286 if end == -1: 287 self.buffer.write(chunks[-1]) 288 289 def is_full(self): 290 return self.buffer.tell() > self.BUFFER_SIZE 291 292 293class CacheChunkBuffer(ChunkBuffer): 294 295 def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS): 296 super().__init__(key, chunker_params) 297 self.cache = cache 298 self.stats = stats 299 300 def write_chunk(self, chunk): 301 id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False) 302 self.cache.repository.async_response(wait=False) 303 return id_ 304 305 306class Archive: 307 308 class DoesNotExist(Error): 309 """Archive {} does not exist""" 310 311 class AlreadyExists(Error): 312 """Archive {} already exists""" 313 314 class IncompatibleFilesystemEncodingError(Error): 315 """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable.""" 316 317 def __init__(self, repository, key, manifest, name, cache=None, create=False, 318 checkpoint_interval=1800, numeric_owner=False, noatime=False, noctime=False, nobirthtime=False, 319 nobsdflags=False, noacls=False, noxattrs=False, 320 progress=False, chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None, 321 consider_part_files=False, log_json=False): 322 self.cwd = os.getcwd() 323 self.key = key 324 self.repository = repository 325 self.cache = cache 326 self.manifest = manifest 327 self.hard_links = {} 328 self.stats = Statistics(output_json=log_json) 329 self.show_progress = progress 330 self.name = name # overwritten later with name from archive metadata 331 self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names) 332 self.comment = None 333 self.checkpoint_interval = checkpoint_interval 334 self.numeric_owner = numeric_owner 335 self.noatime = noatime 336 self.noctime = noctime 337 self.nobirthtime = nobirthtime 338 self.nobsdflags = nobsdflags 339 self.noacls = noacls 340 self.noxattrs = noxattrs 341 assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.' 342 if start is None: 343 start = datetime.utcnow() 344 start_monotonic = time.monotonic() 345 self.chunker_params = chunker_params 346 self.start = start 347 self.start_monotonic = start_monotonic 348 if end is None: 349 end = datetime.utcnow() 350 self.end = end 351 self.consider_part_files = consider_part_files 352 self.pipeline = DownloadPipeline(self.repository, self.key) 353 self.create = create 354 if self.create: 355 self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats) 356 self.chunker = Chunker(self.key.chunk_seed, *chunker_params) 357 if name in manifest.archives: 358 raise self.AlreadyExists(name) 359 self.last_checkpoint = time.monotonic() 360 i = 0 361 while True: 362 self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '') 363 if self.checkpoint_name not in manifest.archives: 364 break 365 i += 1 366 else: 367 info = self.manifest.archives.get(name) 368 if info is None: 369 raise self.DoesNotExist(name) 370 self.load(info.id) 371 self.zeros = None 372 373 def _load_meta(self, id): 374 data = self.key.decrypt(id, self.repository.get(id)) 375 metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape')) 376 if metadata.version != 1: 377 raise Exception('Unknown archive metadata version') 378 return metadata 379 380 def load(self, id): 381 self.id = id 382 self.metadata = self._load_meta(self.id) 383 self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline] 384 self.name = self.metadata.name 385 self.comment = self.metadata.get('comment', '') 386 387 @property 388 def ts(self): 389 """Timestamp of archive creation (start) in UTC""" 390 ts = self.metadata.time 391 return parse_timestamp(ts) 392 393 @property 394 def ts_end(self): 395 """Timestamp of archive creation (end) in UTC""" 396 # fall back to time if there is no time_end present in metadata 397 ts = self.metadata.get('time_end') or self.metadata.time 398 return parse_timestamp(ts) 399 400 @property 401 def fpr(self): 402 return bin_to_hex(self.id) 403 404 @property 405 def duration(self): 406 return format_timedelta(self.end - self.start) 407 408 @property 409 def duration_from_meta(self): 410 return format_timedelta(self.ts_end - self.ts) 411 412 def info(self): 413 if self.create: 414 stats = self.stats 415 start = self.start.replace(tzinfo=timezone.utc) 416 end = self.end.replace(tzinfo=timezone.utc) 417 else: 418 stats = self.calc_stats(self.cache) 419 start = self.ts 420 end = self.ts_end 421 info = { 422 'name': self.name, 423 'id': self.fpr, 424 'start': OutputTimestamp(start), 425 'end': OutputTimestamp(end), 426 'duration': (end - start).total_seconds(), 427 'stats': stats.as_dict(), 428 'limits': { 429 'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE, 430 }, 431 } 432 if self.create: 433 info['command_line'] = sys.argv 434 else: 435 info.update({ 436 'command_line': self.metadata.cmdline, 437 'hostname': self.metadata.hostname, 438 'username': self.metadata.username, 439 'comment': self.metadata.get('comment', ''), 440 'chunker_params': self.metadata.get('chunker_params', ''), 441 }) 442 return info 443 444 def __str__(self): 445 return '''\ 446Archive name: {0.name} 447Archive fingerprint: {0.fpr} 448Time (start): {start} 449Time (end): {end} 450Duration: {0.duration} 451Number of files: {0.stats.nfiles} 452Utilization of max. archive size: {csize_max:.0%} 453'''.format( 454 self, 455 start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)), 456 end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)), 457 csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE) 458 459 def __repr__(self): 460 return 'Archive(%r)' % self.name 461 462 def item_filter(self, item, filter=None): 463 if not self.consider_part_files and 'part' in item: 464 # this is a part(ial) file, we usually don't want to consider it. 465 return False 466 return filter(item) if filter else True 467 468 def iter_items(self, filter=None, partial_extract=False, preload=False, hardlink_masters=None): 469 # note: when calling this with preload=True, later fetch_many() must be called with 470 # is_preloaded=True or the RemoteRepository code will leak memory! 471 assert not (filter and partial_extract and preload) or hardlink_masters is not None 472 for item in self.pipeline.unpack_many(self.metadata.items, partial_extract=partial_extract, 473 preload=preload, hardlink_masters=hardlink_masters, 474 filter=lambda item: self.item_filter(item, filter)): 475 yield item 476 477 def add_item(self, item, show_progress=True): 478 if show_progress and self.show_progress: 479 self.stats.show_progress(item=item, dt=0.2) 480 self.items_buffer.add(item) 481 482 def write_checkpoint(self): 483 self.save(self.checkpoint_name) 484 del self.manifest.archives[self.checkpoint_name] 485 self.cache.chunk_decref(self.id, self.stats) 486 487 def save(self, name=None, comment=None, timestamp=None, additional_metadata=None): 488 name = name or self.name 489 if name in self.manifest.archives: 490 raise self.AlreadyExists(name) 491 self.items_buffer.flush(flush=True) 492 duration = timedelta(seconds=time.monotonic() - self.start_monotonic) 493 if timestamp is None: 494 end = datetime.utcnow() 495 start = end - duration 496 else: 497 end = timestamp + duration 498 start = timestamp 499 self.start = start 500 self.end = end 501 metadata = { 502 'version': 1, 503 'name': name, 504 'comment': comment or '', 505 'items': self.items_buffer.chunks, 506 'cmdline': sys.argv, 507 'hostname': hostname, 508 'username': getuser(), 509 'time': start.strftime(ISO_FORMAT), 510 'time_end': end.strftime(ISO_FORMAT), 511 'chunker_params': self.chunker_params, 512 } 513 metadata.update(additional_metadata or {}) 514 metadata = ArchiveItem(metadata) 515 data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive') 516 self.id = self.key.id_hash(data) 517 try: 518 self.cache.add_chunk(self.id, data, self.stats) 519 except IntegrityError as err: 520 err_msg = str(err) 521 # hack to avoid changing the RPC protocol by introducing new (more specific) exception class 522 if 'More than allowed put data' in err_msg: 523 raise Error('%s - archive too big (issue #1473)!' % err_msg) 524 else: 525 raise 526 while self.repository.async_response(wait=True) is not None: 527 pass 528 self.manifest.archives[name] = (self.id, metadata.time) 529 self.manifest.write() 530 self.repository.commit() 531 self.cache.commit() 532 533 def calc_stats(self, cache): 534 def add(id): 535 entry = cache.chunks[id] 536 archive_index.add(id, 1, entry.size, entry.csize) 537 538 archive_index = ChunkIndex() 539 sync = CacheSynchronizer(archive_index) 540 add(self.id) 541 pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%', msgid='archive.calc_stats') 542 for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)): 543 pi.show(increase=1) 544 add(id) 545 data = self.key.decrypt(id, chunk) 546 sync.feed(data) 547 unique_csize = archive_index.stats_against(cache.chunks)[3] 548 pi.finish() 549 stats = Statistics() 550 stats.nfiles = sync.num_files_totals if self.consider_part_files \ 551 else sync.num_files_totals - sync.num_files_parts 552 stats.osize = sync.size_totals if self.consider_part_files \ 553 else sync.size_totals - sync.size_parts 554 stats.csize = sync.csize_totals if self.consider_part_files \ 555 else sync.csize_totals - sync.csize_parts 556 stats.usize = unique_csize # the part files use same chunks as the full file 557 return stats 558 559 @contextmanager 560 def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters): 561 hardlink_set = False 562 # Hard link? 563 if 'source' in item: 564 source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:]) 565 chunks, link_target = hardlink_masters.get(item.source, (None, source)) 566 if link_target and has_link: 567 # Hard link was extracted previously, just link 568 with backup_io('link'): 569 os.link(link_target, path) 570 hardlink_set = True 571 elif chunks is not None: 572 # assign chunks to this item, since the item which had the chunks was not extracted 573 item.chunks = chunks 574 yield hardlink_set 575 if not hardlink_set and hardlink_masters: 576 if has_link: 577 # Update master entry with extracted item path, so that following hardlinks don't extract twice. 578 # We have hardlinking support, so we will hardlink not extract. 579 hardlink_masters[item.get('source') or original_path] = (None, path) 580 else: 581 # Broken platform with no hardlinking support. 582 # In this case, we *want* to extract twice, because there is no other way. 583 pass 584 585 def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False, 586 hardlink_masters=None, stripped_components=0, original_path=None, pi=None): 587 """ 588 Extract archive item. 589 590 :param item: the item to extract 591 :param restore_attrs: restore file attributes 592 :param dry_run: do not write any data 593 :param stdout: write extracted data to stdout 594 :param sparse: write sparse files (chunk-granularity, independent of the original being sparse) 595 :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly 596 :param stripped_components: stripped leading path components to correct hard link extraction 597 :param original_path: 'path' key as stored in archive 598 :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes) 599 """ 600 hardlink_masters = hardlink_masters or {} 601 has_damaged_chunks = 'chunks_healthy' in item 602 if dry_run or stdout: 603 if 'chunks' in item: 604 item_chunks_size = 0 605 for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True): 606 if pi: 607 pi.show(increase=len(data), info=[remove_surrogates(item.path)]) 608 if stdout: 609 sys.stdout.buffer.write(data) 610 item_chunks_size += len(data) 611 if stdout: 612 sys.stdout.buffer.flush() 613 if 'size' in item: 614 item_size = item.size 615 if item_size != item_chunks_size: 616 raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format( 617 item_size, item_chunks_size)) 618 if has_damaged_chunks: 619 raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.') 620 return 621 622 original_path = original_path or item.path 623 dest = self.cwd 624 if item.path.startswith(('/', '../')): 625 raise Exception('Path should be relative and local') 626 path = os.path.join(dest, item.path) 627 # Attempt to remove existing files, ignore errors on failure 628 try: 629 st = os.stat(path, follow_symlinks=False) 630 if stat.S_ISDIR(st.st_mode): 631 os.rmdir(path) 632 else: 633 os.unlink(path) 634 except UnicodeEncodeError: 635 raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None 636 except OSError: 637 pass 638 639 def make_parent(path): 640 parent_dir = os.path.dirname(path) 641 if not os.path.exists(parent_dir): 642 os.makedirs(parent_dir) 643 644 mode = item.mode 645 if stat.S_ISREG(mode): 646 with backup_io('makedirs'): 647 make_parent(path) 648 with self.extract_helper(dest, item, path, stripped_components, original_path, 649 hardlink_masters) as hardlink_set: 650 if hardlink_set: 651 return 652 if sparse and self.zeros is None: 653 self.zeros = b'\0' * (1 << self.chunker_params[1]) 654 with backup_io('open'): 655 fd = open(path, 'wb') 656 with fd: 657 ids = [c.id for c in item.chunks] 658 for data in self.pipeline.fetch_many(ids, is_preloaded=True): 659 if pi: 660 pi.show(increase=len(data), info=[remove_surrogates(item.path)]) 661 with backup_io('write'): 662 if sparse and self.zeros.startswith(data): 663 # all-zero chunk: create a hole in a sparse file 664 fd.seek(len(data), 1) 665 else: 666 fd.write(data) 667 with backup_io('truncate_and_attrs'): 668 pos = item_chunks_size = fd.tell() 669 fd.truncate(pos) 670 fd.flush() 671 self.restore_attrs(path, item, fd=fd.fileno()) 672 if 'size' in item: 673 item_size = item.size 674 if item_size != item_chunks_size: 675 raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format( 676 item_size, item_chunks_size)) 677 if has_damaged_chunks: 678 raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.') 679 return 680 with backup_io: 681 # No repository access beyond this point. 682 if stat.S_ISDIR(mode): 683 make_parent(path) 684 if not os.path.exists(path): 685 os.mkdir(path) 686 if restore_attrs: 687 self.restore_attrs(path, item) 688 elif stat.S_ISLNK(mode): 689 make_parent(path) 690 source = item.source 691 try: 692 os.symlink(source, path) 693 except UnicodeEncodeError: 694 raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None 695 self.restore_attrs(path, item, symlink=True) 696 elif stat.S_ISFIFO(mode): 697 make_parent(path) 698 with self.extract_helper(dest, item, path, stripped_components, original_path, 699 hardlink_masters) as hardlink_set: 700 if hardlink_set: 701 return 702 os.mkfifo(path) 703 self.restore_attrs(path, item) 704 elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode): 705 make_parent(path) 706 with self.extract_helper(dest, item, path, stripped_components, original_path, 707 hardlink_masters) as hardlink_set: 708 if hardlink_set: 709 return 710 os.mknod(path, item.mode, item.rdev) 711 self.restore_attrs(path, item) 712 else: 713 raise Exception('Unknown archive item type %r' % item.mode) 714 715 def restore_attrs(self, path, item, symlink=False, fd=None): 716 """ 717 Restore filesystem attributes on *path* (*fd*) from *item*. 718 719 Does not access the repository. 720 """ 721 backup_io.op = 'attrs' 722 uid = gid = None 723 if not self.numeric_owner: 724 uid = user2uid(item.user) 725 gid = group2gid(item.group) 726 uid = item.uid if uid is None else uid 727 gid = item.gid if gid is None else gid 728 # This code is a bit of a mess due to os specific differences 729 try: 730 if fd: 731 os.fchown(fd, uid, gid) 732 else: 733 os.chown(path, uid, gid, follow_symlinks=False) 734 except OSError: 735 pass 736 if fd: 737 os.fchmod(fd, item.mode) 738 elif not symlink: 739 os.chmod(path, item.mode) 740 elif has_lchmod: # Not available on Linux 741 os.lchmod(path, item.mode) 742 mtime = item.mtime 743 if 'atime' in item: 744 atime = item.atime 745 else: 746 # old archives only had mtime in item metadata 747 atime = mtime 748 if 'birthtime' in item: 749 birthtime = item.birthtime 750 try: 751 # This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms. 752 # See utimes(2) on either of the BSDs for details. 753 if fd: 754 os.utime(fd, None, ns=(atime, birthtime)) 755 else: 756 os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False) 757 except OSError: 758 # some systems don't support calling utime on a symlink 759 pass 760 try: 761 if fd: 762 os.utime(fd, None, ns=(atime, mtime)) 763 else: 764 os.utime(path, None, ns=(atime, mtime), follow_symlinks=False) 765 except OSError: 766 # some systems don't support calling utime on a symlink 767 pass 768 if not self.noacls: 769 acl_set(path, item, self.numeric_owner) 770 if not self.noxattrs: 771 # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include 772 # the Linux capabilities in the "security.capability" attribute. 773 xattrs = item.get('xattrs', {}) 774 for k, v in xattrs.items(): 775 try: 776 xattr.setxattr(fd or path, k, v, follow_symlinks=False) 777 except OSError as e: 778 msg_format = '%s: when setting extended attribute %s: %%s' % (path, k.decode()) 779 if e.errno == errno.E2BIG: 780 err_str = 'too big for this filesystem' 781 elif e.errno == errno.ENOTSUP: 782 err_str = 'xattrs not supported on this filesystem' 783 elif e.errno == errno.ENOSPC: 784 # no space left on device while setting this specific xattr 785 # ext4 reports ENOSPC when trying to set an xattr with >4kiB while ext4 can only support 4kiB xattrs 786 # (in this case, this is NOT a "disk full" error, just a ext4 limitation). 787 err_str = 'no space left on device [xattr len = %d]' % (len(v), ) 788 else: 789 # generic handler 790 # EACCES: permission denied to set this specific xattr (this may happen related to security.* keys) 791 # EPERM: operation not permitted 792 err_str = os.strerror(e.errno) 793 logger.warning(msg_format % err_str) 794 set_ec(EXIT_WARNING) 795 # bsdflags include the immutable flag and need to be set last: 796 if not self.nobsdflags and 'bsdflags' in item: 797 try: 798 set_flags(path, item.bsdflags, fd=fd) 799 except OSError: 800 pass 801 802 def set_meta(self, key, value): 803 metadata = self._load_meta(self.id) 804 setattr(metadata, key, value) 805 data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape') 806 new_id = self.key.id_hash(data) 807 self.cache.add_chunk(new_id, data, self.stats) 808 self.manifest.archives[self.name] = (new_id, metadata.time) 809 self.cache.chunk_decref(self.id, self.stats) 810 self.id = new_id 811 812 def rename(self, name): 813 if name in self.manifest.archives: 814 raise self.AlreadyExists(name) 815 oldname = self.name 816 self.name = name 817 self.set_meta('name', name) 818 del self.manifest.archives[oldname] 819 820 def delete(self, stats, progress=False, forced=False): 821 class ChunksIndexError(Error): 822 """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction.""" 823 824 exception_ignored = object() 825 826 def fetch_async_response(wait=True): 827 try: 828 return self.repository.async_response(wait=wait) 829 except Repository.ObjectNotFound: 830 nonlocal error 831 # object not in repo - strange, but we wanted to delete it anyway. 832 if forced == 0: 833 raise 834 error = True 835 return exception_ignored # must not return None here 836 837 def chunk_decref(id, stats): 838 try: 839 self.cache.chunk_decref(id, stats, wait=False) 840 except KeyError: 841 cid = bin_to_hex(id) 842 raise ChunksIndexError(cid) 843 else: 844 fetch_async_response(wait=False) 845 846 error = False 847 try: 848 unpacker = msgpack.Unpacker(use_list=False) 849 items_ids = self.metadata.items 850 pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete') 851 for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))): 852 if progress: 853 pi.show(i) 854 data = self.key.decrypt(items_id, data) 855 unpacker.feed(data) 856 chunk_decref(items_id, stats) 857 try: 858 for item in unpacker: 859 item = Item(internal_dict=item) 860 if 'chunks' in item: 861 for chunk_id, size, csize in item.chunks: 862 chunk_decref(chunk_id, stats) 863 except (TypeError, ValueError): 864 # if items metadata spans multiple chunks and one chunk got dropped somehow, 865 # it could be that unpacker yields bad types 866 if forced == 0: 867 raise 868 error = True 869 if progress: 870 pi.finish() 871 except (msgpack.UnpackException, Repository.ObjectNotFound): 872 # items metadata corrupted 873 if forced == 0: 874 raise 875 error = True 876 # in forced delete mode, we try hard to delete at least the manifest entry, 877 # if possible also the archive superblock, even if processing the items raises 878 # some harmless exception. 879 chunk_decref(self.id, stats) 880 del self.manifest.archives[self.name] 881 while fetch_async_response(wait=True) is not None: 882 # we did async deletes, process outstanding results (== exceptions), 883 # so there is nothing pending when we return and our caller wants to commit. 884 pass 885 if error: 886 logger.warning('forced deletion succeeded, but the deleted archive was corrupted.') 887 logger.warning('borg check --repair is required to free all space.') 888 889 def stat_simple_attrs(self, st): 890 attrs = dict( 891 mode=st.st_mode, 892 uid=st.st_uid, 893 gid=st.st_gid, 894 mtime=safe_ns(st.st_mtime_ns), 895 ) 896 # borg can work with archives only having mtime (older attic archives do not have 897 # atime/ctime). it can be useful to omit atime/ctime, if they change without the 898 # file content changing - e.g. to get better metadata deduplication. 899 if not self.noatime: 900 attrs['atime'] = safe_ns(st.st_atime_ns) 901 if not self.noctime: 902 attrs['ctime'] = safe_ns(st.st_ctime_ns) 903 if not self.nobirthtime and hasattr(st, 'st_birthtime'): 904 # sadly, there's no stat_result.st_birthtime_ns 905 attrs['birthtime'] = safe_ns(int(st.st_birthtime * 10**9)) 906 if self.numeric_owner: 907 attrs['user'] = attrs['group'] = None 908 else: 909 attrs['user'] = uid2user(st.st_uid) 910 attrs['group'] = gid2group(st.st_gid) 911 return attrs 912 913 def stat_ext_attrs(self, st, path): 914 attrs = {} 915 with backup_io('extended stat'): 916 xattrs = {} if self.noxattrs else xattr.get_all(path, follow_symlinks=False) 917 bsdflags = 0 if self.nobsdflags else get_flags(path, st) 918 if not self.noacls: 919 acl_get(path, attrs, st, self.numeric_owner) 920 if xattrs: 921 attrs['xattrs'] = StableDict(xattrs) 922 if bsdflags: 923 attrs['bsdflags'] = bsdflags 924 return attrs 925 926 def stat_attrs(self, st, path): 927 attrs = self.stat_simple_attrs(st) 928 attrs.update(self.stat_ext_attrs(st, path)) 929 return attrs 930 931 @contextmanager 932 def create_helper(self, path, st, status=None, hardlinkable=True): 933 safe_path = make_path_safe(path) 934 item = Item(path=safe_path) 935 hardlink_master = False 936 hardlinked = hardlinkable and st.st_nlink > 1 937 if hardlinked: 938 source = self.hard_links.get((st.st_ino, st.st_dev)) 939 if source is not None: 940 item.source = source 941 status = 'h' # hardlink (to already seen inodes) 942 else: 943 hardlink_master = True 944 yield item, status, hardlinked, hardlink_master 945 # if we get here, "with"-block worked ok without error/exception, the item was processed ok... 946 self.add_item(item) 947 # ... and added to the archive, so we can remember it to refer to it later in the archive: 948 if hardlink_master: 949 self.hard_links[(st.st_ino, st.st_dev)] = safe_path 950 951 def process_dir(self, path, st): 952 with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master): 953 item.update(self.stat_attrs(st, path)) 954 return status 955 956 def process_fifo(self, path, st): 957 with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master): # fifo 958 item.update(self.stat_attrs(st, path)) 959 return status 960 961 def process_dev(self, path, st, dev_type): 962 with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master): # char/block device 963 item.rdev = st.st_rdev 964 item.update(self.stat_attrs(st, path)) 965 return status 966 967 def process_symlink(self, path, st): 968 # note: using hardlinkable=False because we can not support hardlinked symlinks, 969 # due to the dual-use of item.source, see issue #2343: 970 # hardlinked symlinks will be archived [and extracted] as non-hardlinked symlinks. 971 with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master): 972 with backup_io('readlink'): 973 source = os.readlink(path) 974 item.source = source 975 item.update(self.stat_attrs(st, path)) 976 return status 977 978 def write_part_file(self, item, from_chunk, number): 979 item = Item(internal_dict=item.as_dict()) 980 length = len(item.chunks) 981 # the item should only have the *additional* chunks we processed after the last partial item: 982 item.chunks = item.chunks[from_chunk:] 983 # for borg recreate, we already have a size member in the source item (giving the total file size), 984 # but we consider only a part of the file here, thus we must recompute the size from the chunks: 985 item.get_size(memorize=True, from_chunks=True) 986 item.path += '.borg_part_%d' % number 987 item.part = number 988 number += 1 989 self.add_item(item, show_progress=False) 990 self.write_checkpoint() 991 return length, number 992 993 def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None): 994 if not chunk_processor: 995 def chunk_processor(data): 996 chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False) 997 self.cache.repository.async_response(wait=False) 998 return chunk_entry 999 1000 item.chunks = [] 1001 # if we rechunkify, we'll get a fundamentally different chunks list, thus we need 1002 # to get rid of .chunks_healthy, as it might not correspond to .chunks any more. 1003 if getattr(self, 'recreate_rechunkify', False) and 'chunks_healthy' in item: 1004 del item.chunks_healthy 1005 from_chunk = 0 1006 part_number = 1 1007 for data in chunk_iter: 1008 item.chunks.append(chunk_processor(data)) 1009 if self.show_progress: 1010 self.stats.show_progress(item=item, dt=0.2) 1011 if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval: 1012 from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) 1013 self.last_checkpoint = time.monotonic() 1014 else: 1015 if part_number > 1: 1016 if item.chunks[from_chunk:]: 1017 # if we already have created a part item inside this file, we want to put the final 1018 # chunks (if any) into a part item also (so all parts can be concatenated to get 1019 # the complete file): 1020 from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) 1021 self.last_checkpoint = time.monotonic() 1022 1023 # if we created part files, we have referenced all chunks from the part files, 1024 # but we also will reference the same chunks also from the final, complete file: 1025 dummy_stats = Statistics() # do not count this data volume twice 1026 for chunk in item.chunks: 1027 cache.chunk_incref(chunk.id, dummy_stats, size=chunk.size) 1028 1029 def process_stdin(self, path, cache, mode, user, group): 1030 uid = user2uid(user) 1031 if uid is None: 1032 raise Error("no such user: %s" % user) 1033 gid = group2gid(group) 1034 if gid is None: 1035 raise Error("no such group: %s" % group) 1036 t = int(time.time()) * 1000000000 1037 item = Item( 1038 path=path, 1039 mode=mode & 0o107777 | 0o100000, # forcing regular file mode 1040 uid=uid, user=user, 1041 gid=gid, group=group, 1042 mtime=t, atime=t, ctime=t, 1043 ) 1044 fd = sys.stdin.buffer # binary 1045 self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd))) 1046 item.get_size(memorize=True) 1047 self.stats.nfiles += 1 1048 self.add_item(item) 1049 return 'i' # stdin 1050 1051 def process_file(self, path, st, cache): 1052 with self.create_helper(path, st, None) as (item, status, hardlinked, hardlink_master): # no status yet 1053 item.update(self.stat_simple_attrs(st)) 1054 is_special_file = is_special(st.st_mode) 1055 if is_special_file: 1056 # we process a special file like a regular file. reflect that in mode, 1057 # so it can be extracted / accessed in FUSE mount like a regular file. 1058 # this needs to be done early, so that part files also get the patched mode. 1059 item.mode = stat.S_IFREG | stat.S_IMODE(item.mode) 1060 if not hardlinked or hardlink_master: 1061 if not is_special_file: 1062 hashed_path = safe_encode(os.path.join(self.cwd, path)) 1063 path_hash = self.key.id_hash(hashed_path) 1064 known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st) 1065 else: 1066 # in --read-special mode, we may be called for special files. 1067 # there should be no information in the cache about special files processed in 1068 # read-special mode, but we better play safe as this was wrong in the past: 1069 hashed_path = path_hash = None 1070 known, ids = False, None 1071 chunks = None 1072 if ids is not None: 1073 # Make sure all ids are available 1074 for id_ in ids: 1075 if not cache.seen_chunk(id_): 1076 status = 'M' # cache said it is unmodified, but we lost a chunk: process file like modified 1077 break 1078 else: 1079 chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids] 1080 status = 'U' # regular file, unchanged 1081 else: 1082 status = 'M' if known else 'A' # regular file, modified or added 1083 item.hardlink_master = hardlinked 1084 # Only chunkify the file if needed 1085 if chunks is not None: 1086 item.chunks = chunks 1087 else: 1088 with backup_io('open'): 1089 fh = Archive._open_rb(path) 1090 with os.fdopen(fh, 'rb') as fd: 1091 self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh))) 1092 if not is_special_file: 1093 # we must not memorize special files, because the contents of e.g. a 1094 # block or char device will change without its mtime/size/inode changing. 1095 cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks]) 1096 self.stats.nfiles += 1 1097 item.update(self.stat_ext_attrs(st, path)) 1098 item.get_size(memorize=True) 1099 return status 1100 1101 @staticmethod 1102 def list_archives(repository, key, manifest, cache=None): 1103 # expensive! see also Manifest.list_archive_infos. 1104 for name in manifest.archives: 1105 yield Archive(repository, key, manifest, name, cache=cache) 1106 1107 @staticmethod 1108 def _open_rb(path): 1109 try: 1110 # if we have O_NOATIME, this likely will succeed if we are root or owner of file: 1111 return os.open(path, flags_noatime) 1112 except PermissionError: 1113 if flags_noatime == flags_normal: 1114 # we do not have O_NOATIME, no need to try again: 1115 raise 1116 # Was this EPERM due to the O_NOATIME flag? Try again without it: 1117 return os.open(path, flags_normal) 1118 1119 1120def valid_msgpacked_dict(d, keys_serialized): 1121 """check if the data <d> looks like a msgpacked dict""" 1122 d_len = len(d) 1123 if d_len == 0: 1124 return False 1125 if d[0] & 0xf0 == 0x80: # object is a fixmap (up to 15 elements) 1126 offs = 1 1127 elif d[0] == 0xde: # object is a map16 (up to 2^16-1 elements) 1128 offs = 3 1129 else: 1130 # object is not a map (dict) 1131 # note: we must not have dicts with > 2^16-1 elements 1132 return False 1133 if d_len <= offs: 1134 return False 1135 # is the first dict key a bytestring? 1136 if d[offs] & 0xe0 == 0xa0: # key is a small bytestring (up to 31 chars) 1137 pass 1138 elif d[offs] in (0xd9, 0xda, 0xdb): # key is a str8, str16 or str32 1139 pass 1140 else: 1141 # key is not a bytestring 1142 return False 1143 # is the bytestring any of the expected key names? 1144 key_serialized = d[offs:] 1145 return any(key_serialized.startswith(pattern) for pattern in keys_serialized) 1146 1147 1148class RobustUnpacker: 1149 """A restartable/robust version of the streaming msgpack unpacker 1150 """ 1151 class UnpackerCrashed(Exception): 1152 """raise if unpacker crashed""" 1153 1154 def __init__(self, validator, item_keys): 1155 super().__init__() 1156 self.item_keys = [msgpack.packb(name.encode()) for name in item_keys] 1157 self.validator = validator 1158 self._buffered_data = [] 1159 self._resync = False 1160 self._unpacker = msgpack.Unpacker(object_hook=StableDict) 1161 1162 def resync(self): 1163 self._buffered_data = [] 1164 self._resync = True 1165 1166 def feed(self, data): 1167 if self._resync: 1168 self._buffered_data.append(data) 1169 else: 1170 self._unpacker.feed(data) 1171 1172 def __iter__(self): 1173 return self 1174 1175 def __next__(self): 1176 def unpack_next(): 1177 try: 1178 return next(self._unpacker) 1179 except (TypeError, ValueError) as err: 1180 # transform exceptions that might be raised when feeding 1181 # msgpack with invalid data to a more specific exception 1182 raise self.UnpackerCrashed(str(err)) 1183 1184 if self._resync: 1185 data = b''.join(self._buffered_data) 1186 while self._resync: 1187 if not data: 1188 raise StopIteration 1189 # Abort early if the data does not look like a serialized item dict 1190 if not valid_msgpacked_dict(data, self.item_keys): 1191 data = data[1:] 1192 continue 1193 self._unpacker = msgpack.Unpacker(object_hook=StableDict) 1194 self._unpacker.feed(data) 1195 try: 1196 item = unpack_next() 1197 except (self.UnpackerCrashed, StopIteration): 1198 # as long as we are resyncing, we also ignore StopIteration 1199 pass 1200 else: 1201 if self.validator(item): 1202 self._resync = False 1203 return item 1204 data = data[1:] 1205 else: 1206 return unpack_next() 1207 1208 1209class ArchiveChecker: 1210 1211 def __init__(self): 1212 self.error_found = False 1213 self.possibly_superseded = set() 1214 1215 def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', glob=None, 1216 verify_data=False, save_space=False): 1217 """Perform a set of checks on 'repository' 1218 1219 :param repair: enable repair mode, write updated or corrected data into repository 1220 :param archive: only check this archive 1221 :param first/last/sort_by: only check this number of first/last archives ordered by sort_by 1222 :param glob: only check archives matching this glob 1223 :param verify_data: integrity verification of data referenced by archives 1224 :param save_space: Repository.commit(save_space) 1225 """ 1226 logger.info('Starting archive consistency check...') 1227 self.check_all = archive is None and not any((first, last, glob)) 1228 self.repair = repair 1229 self.repository = repository 1230 self.init_chunks() 1231 if not self.chunks: 1232 logger.error('Repository contains no apparent data at all, cannot continue check/repair.') 1233 return False 1234 self.key = self.identify_key(repository) 1235 if verify_data: 1236 self.verify_data() 1237 if Manifest.MANIFEST_ID not in self.chunks: 1238 logger.error("Repository manifest not found!") 1239 self.error_found = True 1240 self.manifest = self.rebuild_manifest() 1241 else: 1242 try: 1243 self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key) 1244 except IntegrityError as exc: 1245 logger.error('Repository manifest is corrupted: %s', exc) 1246 self.error_found = True 1247 del self.chunks[Manifest.MANIFEST_ID] 1248 self.manifest = self.rebuild_manifest() 1249 self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, glob=glob) 1250 self.orphan_chunks_check() 1251 self.finish(save_space=save_space) 1252 if self.error_found: 1253 logger.error('Archive consistency check complete, problems found.') 1254 else: 1255 logger.info('Archive consistency check complete, no problems found.') 1256 return self.repair or not self.error_found 1257 1258 def init_chunks(self): 1259 """Fetch a list of all object keys from repository 1260 """ 1261 # Explicitly set the initial hash table capacity to avoid performance issues 1262 # due to hash table "resonance". 1263 # Since reconstruction of archive items can add some new chunks, add 10 % headroom 1264 capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1) 1265 self.chunks = ChunkIndex(capacity) 1266 marker = None 1267 while True: 1268 result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) 1269 if not result: 1270 break 1271 marker = result[-1] 1272 init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0) 1273 for id_ in result: 1274 self.chunks[id_] = init_entry 1275 1276 def identify_key(self, repository): 1277 try: 1278 some_chunkid, _ = next(self.chunks.iteritems()) 1279 except StopIteration: 1280 # repo is completely empty, no chunks 1281 return None 1282 cdata = repository.get(some_chunkid) 1283 return key_factory(repository, cdata) 1284 1285 def verify_data(self): 1286 logger.info('Starting cryptographic data integrity verification...') 1287 chunks_count_index = len(self.chunks) 1288 chunks_count_segments = 0 1289 errors = 0 1290 defect_chunks = [] 1291 pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01, 1292 msgid='check.verify_data') 1293 marker = None 1294 while True: 1295 chunk_ids = self.repository.scan(limit=100, marker=marker) 1296 if not chunk_ids: 1297 break 1298 chunks_count_segments += len(chunk_ids) 1299 marker = chunk_ids[-1] 1300 chunk_data_iter = self.repository.get_many(chunk_ids) 1301 chunk_ids_revd = list(reversed(chunk_ids)) 1302 while chunk_ids_revd: 1303 pi.show() 1304 chunk_id = chunk_ids_revd.pop(-1) # better efficiency 1305 try: 1306 encrypted_data = next(chunk_data_iter) 1307 except (Repository.ObjectNotFound, IntegrityError) as err: 1308 self.error_found = True 1309 errors += 1 1310 logger.error('chunk %s: %s', bin_to_hex(chunk_id), err) 1311 if isinstance(err, IntegrityError): 1312 defect_chunks.append(chunk_id) 1313 # as the exception killed our generator, make a new one for remaining chunks: 1314 if chunk_ids_revd: 1315 chunk_ids = list(reversed(chunk_ids_revd)) 1316 chunk_data_iter = self.repository.get_many(chunk_ids) 1317 else: 1318 _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id 1319 try: 1320 self.key.decrypt(_chunk_id, encrypted_data) 1321 except IntegrityError as integrity_error: 1322 self.error_found = True 1323 errors += 1 1324 logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error) 1325 defect_chunks.append(chunk_id) 1326 pi.finish() 1327 if chunks_count_index != chunks_count_segments: 1328 logger.error('Repo/Chunks index object count vs. segment files object count mismatch.') 1329 logger.error('Repo/Chunks index: %d objects != segment files: %d objects', 1330 chunks_count_index, chunks_count_segments) 1331 if defect_chunks: 1332 if self.repair: 1333 # if we kill the defect chunk here, subsequent actions within this "borg check" 1334 # run will find missing chunks and replace them with all-zero replacement 1335 # chunks and flag the files as "repaired". 1336 # if another backup is done later and the missing chunks get backupped again, 1337 # a "borg check" afterwards can heal all files where this chunk was missing. 1338 logger.warning('Found defect chunks. They will be deleted now, so affected files can ' 1339 'get repaired now and maybe healed later.') 1340 for defect_chunk in defect_chunks: 1341 # remote repo (ssh): retry might help for strange network / NIC / RAM errors 1342 # as the chunk will be retransmitted from remote server. 1343 # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through, 1344 # a defect chunk is likely not in the fs cache any more and really gets re-read 1345 # from the underlying media. 1346 try: 1347 encrypted_data = self.repository.get(defect_chunk) 1348 _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk 1349 self.key.decrypt(_chunk_id, encrypted_data) 1350 except IntegrityError: 1351 # failed twice -> get rid of this chunk 1352 del self.chunks[defect_chunk] 1353 self.repository.delete(defect_chunk) 1354 logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk)) 1355 else: 1356 logger.warning('chunk %s not deleted, did not consistently fail.', bin_to_hex(defect_chunk)) 1357 else: 1358 logger.warning('Found defect chunks. With --repair, they would get deleted, so affected ' 1359 'files could get repaired then and maybe healed later.') 1360 for defect_chunk in defect_chunks: 1361 logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk)) 1362 log = logger.error if errors else logger.info 1363 log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', 1364 chunks_count_segments, errors) 1365 1366 def rebuild_manifest(self): 1367 """Rebuild the manifest object if it is missing 1368 1369 Iterates through all objects in the repository looking for archive metadata blocks. 1370 """ 1371 required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS) 1372 1373 def valid_archive(obj): 1374 if not isinstance(obj, dict): 1375 return False 1376 keys = set(obj) 1377 return required_archive_keys.issubset(keys) 1378 1379 logger.info('Rebuilding missing manifest, this might take some time...') 1380 # as we have lost the manifest, we do not know any more what valid item keys we had. 1381 # collecting any key we encounter in a damaged repo seems unwise, thus we just use 1382 # the hardcoded list from the source code. thus, it is not recommended to rebuild a 1383 # lost manifest on a older borg version than the most recent one that was ever used 1384 # within this repository (assuming that newer borg versions support more item keys). 1385 manifest = Manifest(self.key, self.repository) 1386 archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS] 1387 pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01, 1388 msgid='check.rebuild_manifest') 1389 for chunk_id, _ in self.chunks.iteritems(): 1390 pi.show() 1391 cdata = self.repository.get(chunk_id) 1392 try: 1393 data = self.key.decrypt(chunk_id, cdata) 1394 except IntegrityError as exc: 1395 logger.error('Skipping corrupted chunk: %s', exc) 1396 self.error_found = True 1397 continue 1398 if not valid_msgpacked_dict(data, archive_keys_serialized): 1399 continue 1400 if b'cmdline' not in data or b'\xa7version\x01' not in data: 1401 continue 1402 try: 1403 archive = msgpack.unpackb(data) 1404 # Ignore exceptions that might be raised when feeding 1405 # msgpack with invalid data 1406 except (TypeError, ValueError, StopIteration): 1407 continue 1408 if valid_archive(archive): 1409 archive = ArchiveItem(internal_dict=archive) 1410 name = archive.name 1411 logger.info('Found archive %s', name) 1412 if name in manifest.archives: 1413 i = 1 1414 while True: 1415 new_name = '%s.%d' % (name, i) 1416 if new_name not in manifest.archives: 1417 break 1418 i += 1 1419 logger.warning('Duplicate archive name %s, storing as %s', name, new_name) 1420 name = new_name 1421 manifest.archives[name] = (chunk_id, archive.time) 1422 pi.finish() 1423 logger.info('Manifest rebuild complete.') 1424 return manifest 1425 1426 def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', glob=None): 1427 """Rebuild object reference counts by walking the metadata 1428 1429 Missing and/or incorrect data is repaired when detected 1430 """ 1431 # Exclude the manifest from chunks (manifest entry might be already deleted from self.chunks) 1432 self.chunks.pop(Manifest.MANIFEST_ID, None) 1433 1434 def mark_as_possibly_superseded(id_): 1435 if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0: 1436 self.possibly_superseded.add(id_) 1437 1438 def add_callback(chunk): 1439 id_ = self.key.id_hash(chunk) 1440 cdata = self.key.encrypt(chunk) 1441 add_reference(id_, len(chunk), len(cdata), cdata) 1442 return id_ 1443 1444 def add_reference(id_, size, csize, cdata=None): 1445 try: 1446 self.chunks.incref(id_) 1447 except KeyError: 1448 assert cdata is not None 1449 self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize) 1450 if self.repair: 1451 self.repository.put(id_, cdata) 1452 1453 def verify_file_chunks(archive_name, item): 1454 """Verifies that all file chunks are present. 1455 1456 Missing file chunks will be replaced with new chunks of the same length containing all zeros. 1457 If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one. 1458 """ 1459 def replacement_chunk(size): 1460 data = bytes(size) 1461 chunk_id = self.key.id_hash(data) 1462 cdata = self.key.encrypt(data) 1463 csize = len(cdata) 1464 return chunk_id, size, csize, cdata 1465 1466 offset = 0 1467 chunk_list = [] 1468 chunks_replaced = False 1469 has_chunks_healthy = 'chunks_healthy' in item 1470 chunks_current = item.chunks 1471 chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current 1472 if has_chunks_healthy and len(chunks_current) != len(chunks_healthy): 1473 # should never happen, but there was issue #3218. 1474 logger.warning('{}: {}: Invalid chunks_healthy metadata removed!'.format(archive_name, item.path)) 1475 del item.chunks_healthy 1476 has_chunks_healthy = False 1477 chunks_healthy = chunks_current 1478 for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy): 1479 chunk_id, size, csize = chunk_healthy 1480 if chunk_id not in self.chunks: 1481 # a chunk of the healthy list is missing 1482 if chunk_current == chunk_healthy: 1483 logger.error('{}: {}: New missing file chunk detected (Byte {}-{}, Chunk {}). ' 1484 'Replacing with all-zero chunk.'.format( 1485 archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id))) 1486 self.error_found = chunks_replaced = True 1487 chunk_id, size, csize, cdata = replacement_chunk(size) 1488 add_reference(chunk_id, size, csize, cdata) 1489 else: 1490 logger.info('{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). ' 1491 'It has an all-zero replacement chunk already.'.format( 1492 archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id))) 1493 chunk_id, size, csize = chunk_current 1494 if chunk_id in self.chunks: 1495 add_reference(chunk_id, size, csize) 1496 else: 1497 logger.warning('{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}, Chunk {}). ' 1498 'Generating new replacement chunk.'.format( 1499 archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id))) 1500 self.error_found = chunks_replaced = True 1501 chunk_id, size, csize, cdata = replacement_chunk(size) 1502 add_reference(chunk_id, size, csize, cdata) 1503 else: 1504 if chunk_current == chunk_healthy: 1505 # normal case, all fine. 1506 add_reference(chunk_id, size, csize) 1507 else: 1508 logger.info('{}: {}: Healed previously missing file chunk! (Byte {}-{}, Chunk {}).'.format( 1509 archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id))) 1510 add_reference(chunk_id, size, csize) 1511 mark_as_possibly_superseded(chunk_current[0]) # maybe orphaned the all-zero replacement chunk 1512 chunk_list.append([chunk_id, size, csize]) # list-typed element as chunks_healthy is list-of-lists 1513 offset += size 1514 if chunks_replaced and not has_chunks_healthy: 1515 # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later 1516 item.chunks_healthy = item.chunks 1517 if has_chunks_healthy and chunk_list == chunks_healthy: 1518 logger.info('{}: {}: Completely healed previously damaged file!'.format(archive_name, item.path)) 1519 del item.chunks_healthy 1520 item.chunks = chunk_list 1521 if 'size' in item: 1522 item_size = item.size 1523 item_chunks_size = item.get_size(compressed=False, from_chunks=True) 1524 if item_size != item_chunks_size: 1525 # just warn, but keep the inconsistency, so that borg extract can warn about it. 1526 logger.warning('{}: {}: size inconsistency detected: size {}, chunks size {}'.format( 1527 archive_name, item.path, item_size, item_chunks_size)) 1528 1529 def robust_iterator(archive): 1530 """Iterates through all archive items 1531 1532 Missing item chunks will be skipped and the msgpack stream will be restarted 1533 """ 1534 item_keys = frozenset(key.encode() for key in self.manifest.item_keys) 1535 required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS) 1536 unpacker = RobustUnpacker(lambda item: isinstance(item, StableDict) and b'path' in item, 1537 self.manifest.item_keys) 1538 _state = 0 1539 1540 def missing_chunk_detector(chunk_id): 1541 nonlocal _state 1542 if _state % 2 != int(chunk_id not in self.chunks): 1543 _state += 1 1544 return _state 1545 1546 def report(msg, chunk_id, chunk_no): 1547 cid = bin_to_hex(chunk_id) 1548 msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see "debug dump-archive-items" 1549 self.error_found = True 1550 logger.error(msg) 1551 1552 def list_keys_safe(keys): 1553 return ', '.join((k.decode(errors='replace') if isinstance(k, bytes) else str(k) for k in keys)) 1554 1555 def valid_item(obj): 1556 if not isinstance(obj, StableDict): 1557 return False, 'not a dictionary' 1558 # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item. 1559 # We ignore it here, should it exist. See test_attic013_acl_bug for details. 1560 obj.pop(b'acl', None) 1561 keys = set(obj) 1562 if not required_item_keys.issubset(keys): 1563 return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys) 1564 if not keys.issubset(item_keys): 1565 return False, 'invalid keys: ' + list_keys_safe(keys - item_keys) 1566 return True, '' 1567 1568 i = 0 1569 for state, items in groupby(archive.items, missing_chunk_detector): 1570 items = list(items) 1571 if state % 2: 1572 for chunk_id in items: 1573 report('item metadata chunk missing', chunk_id, i) 1574 i += 1 1575 continue 1576 if state > 0: 1577 unpacker.resync() 1578 for chunk_id, cdata in zip(items, repository.get_many(items)): 1579 data = self.key.decrypt(chunk_id, cdata) 1580 unpacker.feed(data) 1581 try: 1582 for item in unpacker: 1583 valid, reason = valid_item(item) 1584 if valid: 1585 yield Item(internal_dict=item) 1586 else: 1587 report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i) 1588 except RobustUnpacker.UnpackerCrashed: 1589 report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i) 1590 unpacker.resync() 1591 except Exception: 1592 report('Exception while unpacking item metadata', chunk_id, i) 1593 raise 1594 i += 1 1595 1596 if archive is None: 1597 sort_by = sort_by.split(',') 1598 if any((first, last, glob)): 1599 archive_infos = self.manifest.archives.list(sort_by=sort_by, glob=glob, first=first, last=last) 1600 if glob and not archive_infos: 1601 logger.warning('--glob-archives %s does not match any archives', glob) 1602 if first and len(archive_infos) < first: 1603 logger.warning('--first %d archives: only found %d archives', first, len(archive_infos)) 1604 if last and len(archive_infos) < last: 1605 logger.warning('--last %d archives: only found %d archives', last, len(archive_infos)) 1606 else: 1607 archive_infos = self.manifest.archives.list(sort_by=sort_by) 1608 else: 1609 # we only want one specific archive 1610 try: 1611 archive_infos = [self.manifest.archives[archive]] 1612 except KeyError: 1613 logger.error("Archive '%s' not found.", archive) 1614 self.error_found = True 1615 return 1616 num_archives = len(archive_infos) 1617 1618 pi = ProgressIndicatorPercent(total=num_archives, msg='Checking archives %3.1f%%', step=0.1, 1619 msgid='check.rebuild_refcounts') 1620 with cache_if_remote(self.repository) as repository: 1621 for i, info in enumerate(archive_infos): 1622 pi.show(i) 1623 logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives)) 1624 archive_id = info.id 1625 if archive_id not in self.chunks: 1626 logger.error('Archive metadata block is missing!') 1627 self.error_found = True 1628 del self.manifest.archives[info.name] 1629 continue 1630 mark_as_possibly_superseded(archive_id) 1631 cdata = self.repository.get(archive_id) 1632 data = self.key.decrypt(archive_id, cdata) 1633 archive = ArchiveItem(internal_dict=msgpack.unpackb(data)) 1634 if archive.version != 1: 1635 raise Exception('Unknown archive metadata version') 1636 archive.cmdline = [safe_decode(arg) for arg in archive.cmdline] 1637 items_buffer = ChunkBuffer(self.key) 1638 items_buffer.write_chunk = add_callback 1639 for item in robust_iterator(archive): 1640 if 'chunks' in item: 1641 verify_file_chunks(info.name, item) 1642 items_buffer.add(item) 1643 items_buffer.flush(flush=True) 1644 for previous_item_id in archive.items: 1645 mark_as_possibly_superseded(previous_item_id) 1646 archive.items = items_buffer.chunks 1647 data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape') 1648 new_archive_id = self.key.id_hash(data) 1649 cdata = self.key.encrypt(data) 1650 add_reference(new_archive_id, len(data), len(cdata), cdata) 1651 self.manifest.archives[info.name] = (new_archive_id, info.ts) 1652 pi.finish() 1653 1654 def orphan_chunks_check(self): 1655 if self.check_all: 1656 unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0} 1657 orphaned = unused - self.possibly_superseded 1658 if orphaned: 1659 logger.error('{} orphaned objects found!'.format(len(orphaned))) 1660 self.error_found = True 1661 if self.repair and unused: 1662 logger.info('Deleting %d orphaned and %d superseded objects...' % ( 1663 len(orphaned), len(self.possibly_superseded))) 1664 for id_ in unused: 1665 self.repository.delete(id_) 1666 logger.info('Finished deleting orphaned/superseded objects.') 1667 else: 1668 logger.info('Orphaned objects check skipped (needs all archives checked).') 1669 1670 def finish(self, save_space=False): 1671 if self.repair: 1672 logger.info('Writing Manifest.') 1673 self.manifest.write() 1674 logger.info('Committing repo (may take a while, due to compact_segments)...') 1675 self.repository.commit(save_space=save_space) 1676 logger.info('Finished committing repo.') 1677 1678 1679class ArchiveRecreater: 1680 class Interrupted(Exception): 1681 def __init__(self, metadata=None): 1682 self.metadata = metadata or {} 1683 1684 @staticmethod 1685 def is_temporary_archive(archive_name): 1686 return archive_name.endswith('.recreate') 1687 1688 def __init__(self, repository, manifest, key, cache, matcher, 1689 exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False, 1690 chunker_params=None, compression=None, recompress=False, always_recompress=False, 1691 dry_run=False, stats=False, progress=False, file_status_printer=None, 1692 timestamp=None, checkpoint_interval=1800): 1693 self.repository = repository 1694 self.key = key 1695 self.manifest = manifest 1696 self.cache = cache 1697 1698 self.matcher = matcher 1699 self.exclude_caches = exclude_caches 1700 self.exclude_if_present = exclude_if_present or [] 1701 self.keep_exclude_tags = keep_exclude_tags 1702 1703 self.rechunkify = chunker_params is not None 1704 if self.rechunkify: 1705 logger.debug('Rechunking archives to %s', chunker_params) 1706 self.chunker_params = chunker_params or CHUNKER_PARAMS 1707 self.recompress = recompress 1708 self.always_recompress = always_recompress 1709 self.compression = compression or CompressionSpec('none') 1710 self.seen_chunks = set() 1711 1712 self.timestamp = timestamp 1713 self.dry_run = dry_run 1714 self.stats = stats 1715 self.progress = progress 1716 self.print_file_status = file_status_printer or (lambda *args: None) 1717 self.checkpoint_interval = None if dry_run else checkpoint_interval 1718 1719 def recreate(self, archive_name, comment=None, target_name=None): 1720 assert not self.is_temporary_archive(archive_name) 1721 archive = self.open_archive(archive_name) 1722 target = self.create_target(archive, target_name) 1723 if self.exclude_if_present or self.exclude_caches: 1724 self.matcher_add_tagged_dirs(archive) 1725 if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None: 1726 return False 1727 self.process_items(archive, target) 1728 replace_original = target_name is None 1729 self.save(archive, target, comment, replace_original=replace_original) 1730 return True 1731 1732 def process_items(self, archive, target): 1733 matcher = self.matcher 1734 target_is_subset = not matcher.empty() 1735 hardlink_masters = {} if target_is_subset else None 1736 1737 def item_is_hardlink_master(item): 1738 return (target_is_subset and 1739 hardlinkable(item.mode) and 1740 item.get('hardlink_master', True) and 1741 'source' not in item) 1742 1743 for item in archive.iter_items(): 1744 if not matcher.match(item.path): 1745 self.print_file_status('x', item.path) 1746 if item_is_hardlink_master(item): 1747 hardlink_masters[item.path] = (item.get('chunks'), item.get('chunks_healthy'), None) 1748 continue 1749 if target_is_subset and hardlinkable(item.mode) and item.get('source') in hardlink_masters: 1750 # master of this hard link is outside the target subset 1751 chunks, chunks_healthy, new_source = hardlink_masters[item.source] 1752 if new_source is None: 1753 # First item to use this master, move the chunks 1754 item.chunks = chunks 1755 if chunks_healthy is not None: 1756 item.chunks_healthy = chunks_healthy 1757 hardlink_masters[item.source] = (None, None, item.path) 1758 del item.source 1759 else: 1760 # Master was already moved, only update this item's source 1761 item.source = new_source 1762 if self.dry_run: 1763 self.print_file_status('-', item.path) 1764 else: 1765 self.process_item(archive, target, item) 1766 if self.progress: 1767 target.stats.show_progress(final=True) 1768 1769 def process_item(self, archive, target, item): 1770 if 'chunks' in item: 1771 self.process_chunks(archive, target, item) 1772 target.stats.nfiles += 1 1773 target.add_item(item) 1774 self.print_file_status(file_status(item.mode), item.path) 1775 1776 def process_chunks(self, archive, target, item): 1777 if not self.recompress and not target.recreate_rechunkify: 1778 for chunk_id, size, csize in item.chunks: 1779 self.cache.chunk_incref(chunk_id, target.stats) 1780 return item.chunks 1781 chunk_iterator = self.iter_chunks(archive, target, list(item.chunks)) 1782 chunk_processor = partial(self.chunk_processor, target) 1783 target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor) 1784 1785 def chunk_processor(self, target, data): 1786 chunk_id = self.key.id_hash(data) 1787 if chunk_id in self.seen_chunks: 1788 return self.cache.chunk_incref(chunk_id, target.stats) 1789 overwrite = self.recompress 1790 if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks: 1791 # Check if this chunk is already compressed the way we want it 1792 old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False) 1793 if Compressor.detect(old_chunk).name == self.key.compressor.decide(data).name: 1794 # Stored chunk has the same compression we wanted 1795 overwrite = False 1796 chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False) 1797 self.cache.repository.async_response(wait=False) 1798 self.seen_chunks.add(chunk_entry.id) 1799 return chunk_entry 1800 1801 def iter_chunks(self, archive, target, chunks): 1802 chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks]) 1803 if target.recreate_rechunkify: 1804 # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk 1805 # (does not load the entire file into memory) 1806 file = ChunkIteratorFileWrapper(chunk_iterator) 1807 yield from target.chunker.chunkify(file) 1808 else: 1809 for chunk in chunk_iterator: 1810 yield chunk 1811 1812 def save(self, archive, target, comment=None, replace_original=True): 1813 if self.dry_run: 1814 return 1815 if comment is None: 1816 comment = archive.metadata.get('comment', '') 1817 1818 # Keep for the statistics if necessary 1819 if self.stats: 1820 _start = target.start 1821 1822 if self.timestamp is None: 1823 additional_metadata = { 1824 'time': archive.metadata.time, 1825 'time_end': archive.metadata.get('time_end') or archive.metadata.time, 1826 'cmdline': archive.metadata.cmdline, 1827 # but also remember recreate metadata: 1828 'recreate_cmdline': sys.argv, 1829 } 1830 else: 1831 additional_metadata = { 1832 'cmdline': archive.metadata.cmdline, 1833 # but also remember recreate metadata: 1834 'recreate_cmdline': sys.argv, 1835 } 1836 1837 target.save(comment=comment, timestamp=self.timestamp, 1838 additional_metadata=additional_metadata) 1839 if replace_original: 1840 archive.delete(Statistics(), progress=self.progress) 1841 target.rename(archive.name) 1842 if self.stats: 1843 target.start = _start 1844 target.end = datetime.utcnow() 1845 log_multi(DASHES, 1846 str(target), 1847 DASHES, 1848 str(target.stats), 1849 str(self.cache), 1850 DASHES) 1851 1852 def matcher_add_tagged_dirs(self, archive): 1853 """Add excludes to the matcher created by exclude_cache and exclude_if_present.""" 1854 def exclude(dir, tag_item): 1855 if self.keep_exclude_tags: 1856 tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False)) 1857 tagged_dirs.append(FnmatchPattern(dir + '/', recurse_dir=False)) 1858 else: 1859 tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False)) 1860 1861 matcher = self.matcher 1862 tag_files = [] 1863 tagged_dirs = [] 1864 1865 # to support reading hard-linked CACHEDIR.TAGs (aka CACHE_TAG_NAME), similar to hardlink_masters: 1866 cachedir_masters = {} 1867 1868 if self.exclude_caches: 1869 # sadly, due to how CACHEDIR.TAG works (filename AND file [header] contents) and 1870 # how borg deals with hardlinks (slave hardlinks referring back to master hardlinks), 1871 # we need to pass over the archive collecting hardlink master paths. 1872 # as seen in issue #4911, the master paths can have an arbitrary filenames, 1873 # not just CACHEDIR.TAG. 1874 for item in archive.iter_items(filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME): 1875 if stat.S_ISREG(item.mode) and 'chunks' not in item and 'source' in item: 1876 # this is a hardlink slave, referring back to its hardlink master (via item.source) 1877 cachedir_masters[item.source] = None # we know the key (path), but not the value (item) yet 1878 1879 for item in archive.iter_items( 1880 filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)): 1881 if self.exclude_caches and item.path in cachedir_masters: 1882 cachedir_masters[item.path] = item 1883 dir, tag_file = os.path.split(item.path) 1884 if tag_file in self.exclude_if_present: 1885 exclude(dir, item) 1886 elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode): 1887 content_item = item if 'chunks' in item else cachedir_masters[item.source] 1888 file = open_item(archive, content_item) 1889 if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS: 1890 exclude(dir, item) 1891 matcher.add(tag_files, IECommand.Include) 1892 matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse) 1893 1894 def create_target(self, archive, target_name=None): 1895 """Create target archive.""" 1896 target_name = target_name or archive.name + '.recreate' 1897 target = self.create_target_archive(target_name) 1898 # If the archives use the same chunker params, then don't rechunkify 1899 source_chunker_params = tuple(archive.metadata.get('chunker_params', [])) 1900 target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params 1901 if target.recreate_rechunkify: 1902 logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params) 1903 return target 1904 1905 def create_target_archive(self, name): 1906 target = Archive(self.repository, self.key, self.manifest, name, create=True, 1907 progress=self.progress, chunker_params=self.chunker_params, cache=self.cache, 1908 checkpoint_interval=self.checkpoint_interval) 1909 return target 1910 1911 def open_archive(self, name, **kwargs): 1912 return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs) 1913