1import errno
2import json
3import os
4import socket
5import stat
6import sys
7import time
8from contextlib import contextmanager
9from datetime import datetime, timezone, timedelta
10from functools import partial
11from getpass import getuser
12from io import BytesIO
13from itertools import groupby
14from shutil import get_terminal_size
15
16from .logger import create_logger
17
18logger = create_logger()
19
20from . import xattr
21from .chunker import Chunker
22from .cache import ChunkListEntry
23from .crypto.key import key_factory
24from .compress import Compressor, CompressionSpec
25from .constants import *  # NOQA
26from .hashindex import ChunkIndex, ChunkIndexEntry, CacheSynchronizer
27from .helpers import Manifest
28from .helpers import hardlinkable
29from .helpers import ChunkIteratorFileWrapper, open_item
30from .helpers import Error, IntegrityError, set_ec
31from .helpers import uid2user, user2uid, gid2group, group2gid
32from .helpers import parse_timestamp, to_localtime
33from .helpers import OutputTimestamp, format_timedelta, format_file_size, file_status, FileSize
34from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
35from .helpers import StableDict
36from .helpers import bin_to_hex
37from .helpers import safe_ns
38from .helpers import ellipsis_truncate, ProgressIndicatorPercent, log_multi
39from .helpers import msgpack
40from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
41from .item import Item, ArchiveItem
42from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
43from .remote import cache_if_remote
44from .repository import Repository, LIST_SCAN_LIMIT
45
46has_lchmod = hasattr(os, 'lchmod')
47has_link = hasattr(os, 'link')
48
49flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
50flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
51
52
53class Statistics:
54
55    def __init__(self, output_json=False):
56        self.output_json = output_json
57        self.osize = self.csize = self.usize = self.nfiles = 0
58        self.last_progress = 0  # timestamp when last progress was shown
59
60    def update(self, size, csize, unique):
61        self.osize += size
62        self.csize += csize
63        if unique:
64            self.usize += csize
65
66    summary = "{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"
67
68    def __str__(self):
69        return self.summary.format(stats=self, label='This archive:')
70
71    def __repr__(self):
72        return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
73            cls=type(self).__name__, hash=id(self), self=self)
74
75    def as_dict(self):
76        return {
77            'original_size': FileSize(self.osize),
78            'compressed_size': FileSize(self.csize),
79            'deduplicated_size': FileSize(self.usize),
80            'nfiles': self.nfiles,
81        }
82
83    @property
84    def osize_fmt(self):
85        return format_file_size(self.osize)
86
87    @property
88    def usize_fmt(self):
89        return format_file_size(self.usize)
90
91    @property
92    def csize_fmt(self):
93        return format_file_size(self.csize)
94
95    def show_progress(self, item=None, final=False, stream=None, dt=None):
96        now = time.monotonic()
97        if dt is None or now - self.last_progress > dt:
98            self.last_progress = now
99            if self.output_json:
100                data = self.as_dict()
101                data.update({
102                    'time': time.time(),
103                    'type': 'archive_progress',
104                    'path': remove_surrogates(item.path if item else ''),
105                })
106                msg = json.dumps(data)
107                end = '\n'
108            else:
109                columns, lines = get_terminal_size()
110                if not final:
111                    msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
112                    path = remove_surrogates(item.path) if item else ''
113                    space = columns - swidth(msg)
114                    if space < 12:
115                        msg = ''
116                        space = columns - swidth(msg)
117                    if space >= 8:
118                        msg += ellipsis_truncate(path, space)
119                else:
120                    msg = ' ' * columns
121                end = '\r'
122            print(msg, end=end, file=stream or sys.stderr, flush=True)
123
124
125def is_special(mode):
126    # file types that get special treatment in --read-special mode
127    return stat.S_ISBLK(mode) or stat.S_ISCHR(mode) or stat.S_ISFIFO(mode)
128
129
130class BackupError(Exception):
131    """
132    Exception raised for non-OSError-based exceptions while accessing backup files.
133    """
134
135
136class BackupOSError(Exception):
137    """
138    Wrapper for OSError raised while accessing backup files.
139
140    Borg does different kinds of IO, and IO failures have different consequences.
141    This wrapper represents failures of input file or extraction IO.
142    These are non-critical and are only reported (exit code = 1, warning).
143
144    Any unwrapped IO error is critical and aborts execution (for example repository IO failure).
145    """
146    def __init__(self, op, os_error):
147        self.op = op
148        self.os_error = os_error
149        self.errno = os_error.errno
150        self.strerror = os_error.strerror
151        self.filename = os_error.filename
152
153    def __str__(self):
154        if self.op:
155            return '%s: %s' % (self.op, self.os_error)
156        else:
157            return str(self.os_error)
158
159
160class BackupIO:
161    op = ''
162
163    def __call__(self, op=''):
164        self.op = op
165        return self
166
167    def __enter__(self):
168        pass
169
170    def __exit__(self, exc_type, exc_val, exc_tb):
171        if exc_type and issubclass(exc_type, OSError):
172            raise BackupOSError(self.op, exc_val) from exc_val
173
174
175backup_io = BackupIO()
176
177
178def backup_io_iter(iterator):
179    backup_io.op = 'read'
180    while True:
181        with backup_io:
182            try:
183                item = next(iterator)
184            except StopIteration:
185                return
186        yield item
187
188
189class DownloadPipeline:
190
191    def __init__(self, repository, key):
192        self.repository = repository
193        self.key = key
194
195    def unpack_many(self, ids, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
196        """
197        Return iterator of items.
198
199        *ids* is a chunk ID list of an item stream. *filter* is a callable
200        to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
201
202        Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
203        otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
204        """
205        def _preload(chunks):
206            self.repository.preload([c.id for c in chunks])
207
208        masters_preloaded = set()
209        unpacker = msgpack.Unpacker(use_list=False)
210        for data in self.fetch_many(ids):
211            unpacker.feed(data)
212            items = [Item(internal_dict=item) for item in unpacker]
213            for item in items:
214                if 'chunks' in item:
215                    item.chunks = [ChunkListEntry(*e) for e in item.chunks]
216
217            if filter:
218                items = [item for item in items if filter(item)]
219
220            if preload:
221                if filter and partial_extract:
222                    # if we do only a partial extraction, it gets a bit
223                    # complicated with computing the preload items: if a hardlink master item is not
224                    # selected (== not extracted), we will still need to preload its chunks if a
225                    # corresponding hardlink slave is selected (== is extracted).
226                    # due to a side effect of the filter() call, we now have hardlink_masters dict populated.
227                    for item in items:
228                        if 'chunks' in item:  # regular file, maybe a hardlink master
229                            _preload(item.chunks)
230                            # if this is a hardlink master, remember that we already preloaded it:
231                            if 'source' not in item and hardlinkable(item.mode) and item.get('hardlink_master', True):
232                                masters_preloaded.add(item.path)
233                        elif 'source' in item and hardlinkable(item.mode):  # hardlink slave
234                            source = item.source
235                            if source not in masters_preloaded:
236                                # we only need to preload *once* (for the 1st selected slave)
237                                chunks, _ = hardlink_masters[source]
238                                if chunks is not None:
239                                    _preload(chunks)
240                                masters_preloaded.add(source)
241                else:
242                    # easy: we do not have a filter, thus all items are selected, thus we need to preload all chunks.
243                    for item in items:
244                        if 'chunks' in item:
245                            _preload(item.chunks)
246
247            for item in items:
248                yield item
249
250    def fetch_many(self, ids, is_preloaded=False):
251        for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
252            yield self.key.decrypt(id_, data)
253
254
255class ChunkBuffer:
256    BUFFER_SIZE = 8 * 1024 * 1024
257
258    def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
259        self.buffer = BytesIO()
260        self.packer = msgpack.Packer(unicode_errors='surrogateescape')
261        self.chunks = []
262        self.key = key
263        self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
264
265    def add(self, item):
266        self.buffer.write(self.packer.pack(item.as_dict()))
267        if self.is_full():
268            self.flush()
269
270    def write_chunk(self, chunk):
271        raise NotImplementedError
272
273    def flush(self, flush=False):
274        if self.buffer.tell() == 0:
275            return
276        self.buffer.seek(0)
277        # The chunker returns a memoryview to its internal buffer,
278        # thus a copy is needed before resuming the chunker iterator.
279        chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
280        self.buffer.seek(0)
281        self.buffer.truncate(0)
282        # Leave the last partial chunk in the buffer unless flush is True
283        end = None if flush or len(chunks) == 1 else -1
284        for chunk in chunks[:end]:
285            self.chunks.append(self.write_chunk(chunk))
286        if end == -1:
287            self.buffer.write(chunks[-1])
288
289    def is_full(self):
290        return self.buffer.tell() > self.BUFFER_SIZE
291
292
293class CacheChunkBuffer(ChunkBuffer):
294
295    def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
296        super().__init__(key, chunker_params)
297        self.cache = cache
298        self.stats = stats
299
300    def write_chunk(self, chunk):
301        id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
302        self.cache.repository.async_response(wait=False)
303        return id_
304
305
306class Archive:
307
308    class DoesNotExist(Error):
309        """Archive {} does not exist"""
310
311    class AlreadyExists(Error):
312        """Archive {} already exists"""
313
314    class IncompatibleFilesystemEncodingError(Error):
315        """Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
316
317    def __init__(self, repository, key, manifest, name, cache=None, create=False,
318                 checkpoint_interval=1800, numeric_owner=False, noatime=False, noctime=False, nobirthtime=False,
319                 nobsdflags=False, noacls=False, noxattrs=False,
320                 progress=False, chunker_params=CHUNKER_PARAMS, start=None, start_monotonic=None, end=None,
321                 consider_part_files=False, log_json=False):
322        self.cwd = os.getcwd()
323        self.key = key
324        self.repository = repository
325        self.cache = cache
326        self.manifest = manifest
327        self.hard_links = {}
328        self.stats = Statistics(output_json=log_json)
329        self.show_progress = progress
330        self.name = name  # overwritten later with name from archive metadata
331        self.name_in_manifest = name  # can differ from .name later (if borg check fixed duplicate archive names)
332        self.comment = None
333        self.checkpoint_interval = checkpoint_interval
334        self.numeric_owner = numeric_owner
335        self.noatime = noatime
336        self.noctime = noctime
337        self.nobirthtime = nobirthtime
338        self.nobsdflags = nobsdflags
339        self.noacls = noacls
340        self.noxattrs = noxattrs
341        assert (start is None) == (start_monotonic is None), 'Logic error: if start is given, start_monotonic must be given as well and vice versa.'
342        if start is None:
343            start = datetime.utcnow()
344            start_monotonic = time.monotonic()
345        self.chunker_params = chunker_params
346        self.start = start
347        self.start_monotonic = start_monotonic
348        if end is None:
349            end = datetime.utcnow()
350        self.end = end
351        self.consider_part_files = consider_part_files
352        self.pipeline = DownloadPipeline(self.repository, self.key)
353        self.create = create
354        if self.create:
355            self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
356            self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
357            if name in manifest.archives:
358                raise self.AlreadyExists(name)
359            self.last_checkpoint = time.monotonic()
360            i = 0
361            while True:
362                self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
363                if self.checkpoint_name not in manifest.archives:
364                    break
365                i += 1
366        else:
367            info = self.manifest.archives.get(name)
368            if info is None:
369                raise self.DoesNotExist(name)
370            self.load(info.id)
371            self.zeros = None
372
373    def _load_meta(self, id):
374        data = self.key.decrypt(id, self.repository.get(id))
375        metadata = ArchiveItem(internal_dict=msgpack.unpackb(data, unicode_errors='surrogateescape'))
376        if metadata.version != 1:
377            raise Exception('Unknown archive metadata version')
378        return metadata
379
380    def load(self, id):
381        self.id = id
382        self.metadata = self._load_meta(self.id)
383        self.metadata.cmdline = [safe_decode(arg) for arg in self.metadata.cmdline]
384        self.name = self.metadata.name
385        self.comment = self.metadata.get('comment', '')
386
387    @property
388    def ts(self):
389        """Timestamp of archive creation (start) in UTC"""
390        ts = self.metadata.time
391        return parse_timestamp(ts)
392
393    @property
394    def ts_end(self):
395        """Timestamp of archive creation (end) in UTC"""
396        # fall back to time if there is no time_end present in metadata
397        ts = self.metadata.get('time_end') or self.metadata.time
398        return parse_timestamp(ts)
399
400    @property
401    def fpr(self):
402        return bin_to_hex(self.id)
403
404    @property
405    def duration(self):
406        return format_timedelta(self.end - self.start)
407
408    @property
409    def duration_from_meta(self):
410        return format_timedelta(self.ts_end - self.ts)
411
412    def info(self):
413        if self.create:
414            stats = self.stats
415            start = self.start.replace(tzinfo=timezone.utc)
416            end = self.end.replace(tzinfo=timezone.utc)
417        else:
418            stats = self.calc_stats(self.cache)
419            start = self.ts
420            end = self.ts_end
421        info = {
422            'name': self.name,
423            'id': self.fpr,
424            'start': OutputTimestamp(start),
425            'end': OutputTimestamp(end),
426            'duration': (end - start).total_seconds(),
427            'stats': stats.as_dict(),
428            'limits': {
429                'max_archive_size': self.cache.chunks[self.id].csize / MAX_DATA_SIZE,
430            },
431        }
432        if self.create:
433            info['command_line'] = sys.argv
434        else:
435            info.update({
436                'command_line': self.metadata.cmdline,
437                'hostname': self.metadata.hostname,
438                'username': self.metadata.username,
439                'comment': self.metadata.get('comment', ''),
440                'chunker_params': self.metadata.get('chunker_params', ''),
441            })
442        return info
443
444    def __str__(self):
445        return '''\
446Archive name: {0.name}
447Archive fingerprint: {0.fpr}
448Time (start): {start}
449Time (end):   {end}
450Duration: {0.duration}
451Number of files: {0.stats.nfiles}
452Utilization of max. archive size: {csize_max:.0%}
453'''.format(
454            self,
455            start=OutputTimestamp(self.start.replace(tzinfo=timezone.utc)),
456            end=OutputTimestamp(self.end.replace(tzinfo=timezone.utc)),
457            csize_max=self.cache.chunks[self.id].csize / MAX_DATA_SIZE)
458
459    def __repr__(self):
460        return 'Archive(%r)' % self.name
461
462    def item_filter(self, item, filter=None):
463        if not self.consider_part_files and 'part' in item:
464            # this is a part(ial) file, we usually don't want to consider it.
465            return False
466        return filter(item) if filter else True
467
468    def iter_items(self, filter=None, partial_extract=False, preload=False, hardlink_masters=None):
469        # note: when calling this with preload=True, later fetch_many() must be called with
470        # is_preloaded=True or the RemoteRepository code will leak memory!
471        assert not (filter and partial_extract and preload) or hardlink_masters is not None
472        for item in self.pipeline.unpack_many(self.metadata.items, partial_extract=partial_extract,
473                                              preload=preload, hardlink_masters=hardlink_masters,
474                                              filter=lambda item: self.item_filter(item, filter)):
475            yield item
476
477    def add_item(self, item, show_progress=True):
478        if show_progress and self.show_progress:
479            self.stats.show_progress(item=item, dt=0.2)
480        self.items_buffer.add(item)
481
482    def write_checkpoint(self):
483        self.save(self.checkpoint_name)
484        del self.manifest.archives[self.checkpoint_name]
485        self.cache.chunk_decref(self.id, self.stats)
486
487    def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
488        name = name or self.name
489        if name in self.manifest.archives:
490            raise self.AlreadyExists(name)
491        self.items_buffer.flush(flush=True)
492        duration = timedelta(seconds=time.monotonic() - self.start_monotonic)
493        if timestamp is None:
494            end = datetime.utcnow()
495            start = end - duration
496        else:
497            end = timestamp + duration
498            start = timestamp
499        self.start = start
500        self.end = end
501        metadata = {
502            'version': 1,
503            'name': name,
504            'comment': comment or '',
505            'items': self.items_buffer.chunks,
506            'cmdline': sys.argv,
507            'hostname': hostname,
508            'username': getuser(),
509            'time': start.strftime(ISO_FORMAT),
510            'time_end': end.strftime(ISO_FORMAT),
511            'chunker_params': self.chunker_params,
512        }
513        metadata.update(additional_metadata or {})
514        metadata = ArchiveItem(metadata)
515        data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b'archive')
516        self.id = self.key.id_hash(data)
517        try:
518            self.cache.add_chunk(self.id, data, self.stats)
519        except IntegrityError as err:
520            err_msg = str(err)
521            # hack to avoid changing the RPC protocol by introducing new (more specific) exception class
522            if 'More than allowed put data' in err_msg:
523                raise Error('%s - archive too big (issue #1473)!' % err_msg)
524            else:
525                raise
526        while self.repository.async_response(wait=True) is not None:
527            pass
528        self.manifest.archives[name] = (self.id, metadata.time)
529        self.manifest.write()
530        self.repository.commit()
531        self.cache.commit()
532
533    def calc_stats(self, cache):
534        def add(id):
535            entry = cache.chunks[id]
536            archive_index.add(id, 1, entry.size, entry.csize)
537
538        archive_index = ChunkIndex()
539        sync = CacheSynchronizer(archive_index)
540        add(self.id)
541        pi = ProgressIndicatorPercent(total=len(self.metadata.items), msg='Calculating statistics... %3d%%', msgid='archive.calc_stats')
542        for id, chunk in zip(self.metadata.items, self.repository.get_many(self.metadata.items)):
543            pi.show(increase=1)
544            add(id)
545            data = self.key.decrypt(id, chunk)
546            sync.feed(data)
547        unique_csize = archive_index.stats_against(cache.chunks)[3]
548        pi.finish()
549        stats = Statistics()
550        stats.nfiles = sync.num_files_totals if self.consider_part_files \
551                       else sync.num_files_totals - sync.num_files_parts
552        stats.osize = sync.size_totals if self.consider_part_files \
553                      else sync.size_totals - sync.size_parts
554        stats.csize = sync.csize_totals if self.consider_part_files \
555                      else sync.csize_totals - sync.csize_parts
556        stats.usize = unique_csize  # the part files use same chunks as the full file
557        return stats
558
559    @contextmanager
560    def extract_helper(self, dest, item, path, stripped_components, original_path, hardlink_masters):
561        hardlink_set = False
562        # Hard link?
563        if 'source' in item:
564            source = os.path.join(dest, *item.source.split(os.sep)[stripped_components:])
565            chunks, link_target = hardlink_masters.get(item.source, (None, source))
566            if link_target and has_link:
567                # Hard link was extracted previously, just link
568                with backup_io('link'):
569                    os.link(link_target, path)
570                    hardlink_set = True
571            elif chunks is not None:
572                # assign chunks to this item, since the item which had the chunks was not extracted
573                item.chunks = chunks
574        yield hardlink_set
575        if not hardlink_set and hardlink_masters:
576            if has_link:
577                # Update master entry with extracted item path, so that following hardlinks don't extract twice.
578                # We have hardlinking support, so we will hardlink not extract.
579                hardlink_masters[item.get('source') or original_path] = (None, path)
580            else:
581                # Broken platform with no hardlinking support.
582                # In this case, we *want* to extract twice, because there is no other way.
583                pass
584
585    def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
586                     hardlink_masters=None, stripped_components=0, original_path=None, pi=None):
587        """
588        Extract archive item.
589
590        :param item: the item to extract
591        :param restore_attrs: restore file attributes
592        :param dry_run: do not write any data
593        :param stdout: write extracted data to stdout
594        :param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
595        :param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
596        :param stripped_components: stripped leading path components to correct hard link extraction
597        :param original_path: 'path' key as stored in archive
598        :param pi: ProgressIndicatorPercent (or similar) for file extraction progress (in bytes)
599        """
600        hardlink_masters = hardlink_masters or {}
601        has_damaged_chunks = 'chunks_healthy' in item
602        if dry_run or stdout:
603            if 'chunks' in item:
604                item_chunks_size = 0
605                for data in self.pipeline.fetch_many([c.id for c in item.chunks], is_preloaded=True):
606                    if pi:
607                        pi.show(increase=len(data), info=[remove_surrogates(item.path)])
608                    if stdout:
609                        sys.stdout.buffer.write(data)
610                    item_chunks_size += len(data)
611                if stdout:
612                    sys.stdout.buffer.flush()
613                if 'size' in item:
614                    item_size = item.size
615                    if item_size != item_chunks_size:
616                        raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
617                                          item_size, item_chunks_size))
618            if has_damaged_chunks:
619                raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.')
620            return
621
622        original_path = original_path or item.path
623        dest = self.cwd
624        if item.path.startswith(('/', '../')):
625            raise Exception('Path should be relative and local')
626        path = os.path.join(dest, item.path)
627        # Attempt to remove existing files, ignore errors on failure
628        try:
629            st = os.stat(path, follow_symlinks=False)
630            if stat.S_ISDIR(st.st_mode):
631                os.rmdir(path)
632            else:
633                os.unlink(path)
634        except UnicodeEncodeError:
635            raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
636        except OSError:
637            pass
638
639        def make_parent(path):
640            parent_dir = os.path.dirname(path)
641            if not os.path.exists(parent_dir):
642                os.makedirs(parent_dir)
643
644        mode = item.mode
645        if stat.S_ISREG(mode):
646            with backup_io('makedirs'):
647                make_parent(path)
648            with self.extract_helper(dest, item, path, stripped_components, original_path,
649                                     hardlink_masters) as hardlink_set:
650                if hardlink_set:
651                    return
652                if sparse and self.zeros is None:
653                    self.zeros = b'\0' * (1 << self.chunker_params[1])
654                with backup_io('open'):
655                    fd = open(path, 'wb')
656                with fd:
657                    ids = [c.id for c in item.chunks]
658                    for data in self.pipeline.fetch_many(ids, is_preloaded=True):
659                        if pi:
660                            pi.show(increase=len(data), info=[remove_surrogates(item.path)])
661                        with backup_io('write'):
662                            if sparse and self.zeros.startswith(data):
663                                # all-zero chunk: create a hole in a sparse file
664                                fd.seek(len(data), 1)
665                            else:
666                                fd.write(data)
667                    with backup_io('truncate_and_attrs'):
668                        pos = item_chunks_size = fd.tell()
669                        fd.truncate(pos)
670                        fd.flush()
671                        self.restore_attrs(path, item, fd=fd.fileno())
672                if 'size' in item:
673                    item_size = item.size
674                    if item_size != item_chunks_size:
675                        raise BackupError('Size inconsistency detected: size {}, chunks size {}'.format(
676                                          item_size, item_chunks_size))
677                if has_damaged_chunks:
678                    raise BackupError('File has damaged (all-zero) chunks. Try running borg check --repair.')
679            return
680        with backup_io:
681            # No repository access beyond this point.
682            if stat.S_ISDIR(mode):
683                make_parent(path)
684                if not os.path.exists(path):
685                    os.mkdir(path)
686                if restore_attrs:
687                    self.restore_attrs(path, item)
688            elif stat.S_ISLNK(mode):
689                make_parent(path)
690                source = item.source
691                try:
692                    os.symlink(source, path)
693                except UnicodeEncodeError:
694                    raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
695                self.restore_attrs(path, item, symlink=True)
696            elif stat.S_ISFIFO(mode):
697                make_parent(path)
698                with self.extract_helper(dest, item, path, stripped_components, original_path,
699                                         hardlink_masters) as hardlink_set:
700                    if hardlink_set:
701                        return
702                    os.mkfifo(path)
703                    self.restore_attrs(path, item)
704            elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
705                make_parent(path)
706                with self.extract_helper(dest, item, path, stripped_components, original_path,
707                                         hardlink_masters) as hardlink_set:
708                    if hardlink_set:
709                        return
710                    os.mknod(path, item.mode, item.rdev)
711                    self.restore_attrs(path, item)
712            else:
713                raise Exception('Unknown archive item type %r' % item.mode)
714
715    def restore_attrs(self, path, item, symlink=False, fd=None):
716        """
717        Restore filesystem attributes on *path* (*fd*) from *item*.
718
719        Does not access the repository.
720        """
721        backup_io.op = 'attrs'
722        uid = gid = None
723        if not self.numeric_owner:
724            uid = user2uid(item.user)
725            gid = group2gid(item.group)
726        uid = item.uid if uid is None else uid
727        gid = item.gid if gid is None else gid
728        # This code is a bit of a mess due to os specific differences
729        try:
730            if fd:
731                os.fchown(fd, uid, gid)
732            else:
733                os.chown(path, uid, gid, follow_symlinks=False)
734        except OSError:
735            pass
736        if fd:
737            os.fchmod(fd, item.mode)
738        elif not symlink:
739            os.chmod(path, item.mode)
740        elif has_lchmod:  # Not available on Linux
741            os.lchmod(path, item.mode)
742        mtime = item.mtime
743        if 'atime' in item:
744            atime = item.atime
745        else:
746            # old archives only had mtime in item metadata
747            atime = mtime
748        if 'birthtime' in item:
749            birthtime = item.birthtime
750            try:
751                # This should work on FreeBSD, NetBSD, and Darwin and be harmless on other platforms.
752                # See utimes(2) on either of the BSDs for details.
753                if fd:
754                    os.utime(fd, None, ns=(atime, birthtime))
755                else:
756                    os.utime(path, None, ns=(atime, birthtime), follow_symlinks=False)
757            except OSError:
758                # some systems don't support calling utime on a symlink
759                pass
760        try:
761            if fd:
762                os.utime(fd, None, ns=(atime, mtime))
763            else:
764                os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
765        except OSError:
766            # some systems don't support calling utime on a symlink
767            pass
768        if not self.noacls:
769            acl_set(path, item, self.numeric_owner)
770        if not self.noxattrs:
771            # chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
772            # the Linux capabilities in the "security.capability" attribute.
773            xattrs = item.get('xattrs', {})
774            for k, v in xattrs.items():
775                try:
776                    xattr.setxattr(fd or path, k, v, follow_symlinks=False)
777                except OSError as e:
778                    msg_format = '%s: when setting extended attribute %s: %%s' % (path, k.decode())
779                    if e.errno == errno.E2BIG:
780                        err_str = 'too big for this filesystem'
781                    elif e.errno == errno.ENOTSUP:
782                        err_str = 'xattrs not supported on this filesystem'
783                    elif e.errno == errno.ENOSPC:
784                        # no space left on device while setting this specific xattr
785                        # ext4 reports ENOSPC when trying to set an xattr with >4kiB while ext4 can only support 4kiB xattrs
786                        # (in this case, this is NOT a "disk full" error, just a ext4 limitation).
787                        err_str = 'no space left on device [xattr len = %d]' % (len(v), )
788                    else:
789                        # generic handler
790                        # EACCES: permission denied to set this specific xattr (this may happen related to security.* keys)
791                        # EPERM: operation not permitted
792                        err_str = os.strerror(e.errno)
793                    logger.warning(msg_format % err_str)
794                    set_ec(EXIT_WARNING)
795        # bsdflags include the immutable flag and need to be set last:
796        if not self.nobsdflags and 'bsdflags' in item:
797            try:
798                set_flags(path, item.bsdflags, fd=fd)
799            except OSError:
800                pass
801
802    def set_meta(self, key, value):
803        metadata = self._load_meta(self.id)
804        setattr(metadata, key, value)
805        data = msgpack.packb(metadata.as_dict(), unicode_errors='surrogateescape')
806        new_id = self.key.id_hash(data)
807        self.cache.add_chunk(new_id, data, self.stats)
808        self.manifest.archives[self.name] = (new_id, metadata.time)
809        self.cache.chunk_decref(self.id, self.stats)
810        self.id = new_id
811
812    def rename(self, name):
813        if name in self.manifest.archives:
814            raise self.AlreadyExists(name)
815        oldname = self.name
816        self.name = name
817        self.set_meta('name', name)
818        del self.manifest.archives[oldname]
819
820    def delete(self, stats, progress=False, forced=False):
821        class ChunksIndexError(Error):
822            """Chunk ID {} missing from chunks index, corrupted chunks index - aborting transaction."""
823
824        exception_ignored = object()
825
826        def fetch_async_response(wait=True):
827            try:
828                return self.repository.async_response(wait=wait)
829            except Repository.ObjectNotFound:
830                nonlocal error
831                # object not in repo - strange, but we wanted to delete it anyway.
832                if forced == 0:
833                    raise
834                error = True
835                return exception_ignored  # must not return None here
836
837        def chunk_decref(id, stats):
838            try:
839                self.cache.chunk_decref(id, stats, wait=False)
840            except KeyError:
841                cid = bin_to_hex(id)
842                raise ChunksIndexError(cid)
843            else:
844                fetch_async_response(wait=False)
845
846        error = False
847        try:
848            unpacker = msgpack.Unpacker(use_list=False)
849            items_ids = self.metadata.items
850            pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", msgid='archive.delete')
851            for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
852                if progress:
853                    pi.show(i)
854                data = self.key.decrypt(items_id, data)
855                unpacker.feed(data)
856                chunk_decref(items_id, stats)
857                try:
858                    for item in unpacker:
859                        item = Item(internal_dict=item)
860                        if 'chunks' in item:
861                            for chunk_id, size, csize in item.chunks:
862                                chunk_decref(chunk_id, stats)
863                except (TypeError, ValueError):
864                    # if items metadata spans multiple chunks and one chunk got dropped somehow,
865                    # it could be that unpacker yields bad types
866                    if forced == 0:
867                        raise
868                    error = True
869            if progress:
870                pi.finish()
871        except (msgpack.UnpackException, Repository.ObjectNotFound):
872            # items metadata corrupted
873            if forced == 0:
874                raise
875            error = True
876        # in forced delete mode, we try hard to delete at least the manifest entry,
877        # if possible also the archive superblock, even if processing the items raises
878        # some harmless exception.
879        chunk_decref(self.id, stats)
880        del self.manifest.archives[self.name]
881        while fetch_async_response(wait=True) is not None:
882            # we did async deletes, process outstanding results (== exceptions),
883            # so there is nothing pending when we return and our caller wants to commit.
884            pass
885        if error:
886            logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
887            logger.warning('borg check --repair is required to free all space.')
888
889    def stat_simple_attrs(self, st):
890        attrs = dict(
891            mode=st.st_mode,
892            uid=st.st_uid,
893            gid=st.st_gid,
894            mtime=safe_ns(st.st_mtime_ns),
895        )
896        # borg can work with archives only having mtime (older attic archives do not have
897        # atime/ctime). it can be useful to omit atime/ctime, if they change without the
898        # file content changing - e.g. to get better metadata deduplication.
899        if not self.noatime:
900            attrs['atime'] = safe_ns(st.st_atime_ns)
901        if not self.noctime:
902            attrs['ctime'] = safe_ns(st.st_ctime_ns)
903        if not self.nobirthtime and hasattr(st, 'st_birthtime'):
904            # sadly, there's no stat_result.st_birthtime_ns
905            attrs['birthtime'] = safe_ns(int(st.st_birthtime * 10**9))
906        if self.numeric_owner:
907            attrs['user'] = attrs['group'] = None
908        else:
909            attrs['user'] = uid2user(st.st_uid)
910            attrs['group'] = gid2group(st.st_gid)
911        return attrs
912
913    def stat_ext_attrs(self, st, path):
914        attrs = {}
915        with backup_io('extended stat'):
916            xattrs = {} if self.noxattrs else xattr.get_all(path, follow_symlinks=False)
917            bsdflags = 0 if self.nobsdflags else get_flags(path, st)
918            if not self.noacls:
919                acl_get(path, attrs, st, self.numeric_owner)
920        if xattrs:
921            attrs['xattrs'] = StableDict(xattrs)
922        if bsdflags:
923            attrs['bsdflags'] = bsdflags
924        return attrs
925
926    def stat_attrs(self, st, path):
927        attrs = self.stat_simple_attrs(st)
928        attrs.update(self.stat_ext_attrs(st, path))
929        return attrs
930
931    @contextmanager
932    def create_helper(self, path, st, status=None, hardlinkable=True):
933        safe_path = make_path_safe(path)
934        item = Item(path=safe_path)
935        hardlink_master = False
936        hardlinked = hardlinkable and st.st_nlink > 1
937        if hardlinked:
938            source = self.hard_links.get((st.st_ino, st.st_dev))
939            if source is not None:
940                item.source = source
941                status = 'h'  # hardlink (to already seen inodes)
942            else:
943                hardlink_master = True
944        yield item, status, hardlinked, hardlink_master
945        # if we get here, "with"-block worked ok without error/exception, the item was processed ok...
946        self.add_item(item)
947        # ... and added to the archive, so we can remember it to refer to it later in the archive:
948        if hardlink_master:
949            self.hard_links[(st.st_ino, st.st_dev)] = safe_path
950
951    def process_dir(self, path, st):
952        with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
953            item.update(self.stat_attrs(st, path))
954            return status
955
956    def process_fifo(self, path, st):
957        with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master):  # fifo
958            item.update(self.stat_attrs(st, path))
959            return status
960
961    def process_dev(self, path, st, dev_type):
962        with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master):  # char/block device
963            item.rdev = st.st_rdev
964            item.update(self.stat_attrs(st, path))
965            return status
966
967    def process_symlink(self, path, st):
968        # note: using hardlinkable=False because we can not support hardlinked symlinks,
969        #       due to the dual-use of item.source, see issue #2343:
970        # hardlinked symlinks will be archived [and extracted] as non-hardlinked symlinks.
971        with self.create_helper(path, st, 's', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
972            with backup_io('readlink'):
973                source = os.readlink(path)
974            item.source = source
975            item.update(self.stat_attrs(st, path))
976            return status
977
978    def write_part_file(self, item, from_chunk, number):
979        item = Item(internal_dict=item.as_dict())
980        length = len(item.chunks)
981        # the item should only have the *additional* chunks we processed after the last partial item:
982        item.chunks = item.chunks[from_chunk:]
983        # for borg recreate, we already have a size member in the source item (giving the total file size),
984        # but we consider only a part of the file here, thus we must recompute the size from the chunks:
985        item.get_size(memorize=True, from_chunks=True)
986        item.path += '.borg_part_%d' % number
987        item.part = number
988        number += 1
989        self.add_item(item, show_progress=False)
990        self.write_checkpoint()
991        return length, number
992
993    def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None):
994        if not chunk_processor:
995            def chunk_processor(data):
996                chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
997                self.cache.repository.async_response(wait=False)
998                return chunk_entry
999
1000        item.chunks = []
1001        # if we rechunkify, we'll get a fundamentally different chunks list, thus we need
1002        # to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
1003        if getattr(self, 'recreate_rechunkify', False) and 'chunks_healthy' in item:
1004            del item.chunks_healthy
1005        from_chunk = 0
1006        part_number = 1
1007        for data in chunk_iter:
1008            item.chunks.append(chunk_processor(data))
1009            if self.show_progress:
1010                self.stats.show_progress(item=item, dt=0.2)
1011            if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
1012                from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
1013                self.last_checkpoint = time.monotonic()
1014        else:
1015            if part_number > 1:
1016                if item.chunks[from_chunk:]:
1017                    # if we already have created a part item inside this file, we want to put the final
1018                    # chunks (if any) into a part item also (so all parts can be concatenated to get
1019                    # the complete file):
1020                    from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
1021                    self.last_checkpoint = time.monotonic()
1022
1023                # if we created part files, we have referenced all chunks from the part files,
1024                # but we also will reference the same chunks also from the final, complete file:
1025                dummy_stats = Statistics()  # do not count this data volume twice
1026                for chunk in item.chunks:
1027                    cache.chunk_incref(chunk.id, dummy_stats, size=chunk.size)
1028
1029    def process_stdin(self, path, cache, mode, user, group):
1030        uid = user2uid(user)
1031        if uid is None:
1032            raise Error("no such user: %s" % user)
1033        gid = group2gid(group)
1034        if gid is None:
1035            raise Error("no such group: %s" % group)
1036        t = int(time.time()) * 1000000000
1037        item = Item(
1038            path=path,
1039            mode=mode & 0o107777 | 0o100000,  # forcing regular file mode
1040            uid=uid, user=user,
1041            gid=gid, group=group,
1042            mtime=t, atime=t, ctime=t,
1043        )
1044        fd = sys.stdin.buffer  # binary
1045        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
1046        item.get_size(memorize=True)
1047        self.stats.nfiles += 1
1048        self.add_item(item)
1049        return 'i'  # stdin
1050
1051    def process_file(self, path, st, cache):
1052        with self.create_helper(path, st, None) as (item, status, hardlinked, hardlink_master):  # no status yet
1053            item.update(self.stat_simple_attrs(st))
1054            is_special_file = is_special(st.st_mode)
1055            if is_special_file:
1056                # we process a special file like a regular file. reflect that in mode,
1057                # so it can be extracted / accessed in FUSE mount like a regular file.
1058                # this needs to be done early, so that part files also get the patched mode.
1059                item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
1060            if not hardlinked or hardlink_master:
1061                if not is_special_file:
1062                    hashed_path = safe_encode(os.path.join(self.cwd, path))
1063                    path_hash = self.key.id_hash(hashed_path)
1064                    known, ids = cache.file_known_and_unchanged(hashed_path, path_hash, st)
1065                else:
1066                    # in --read-special mode, we may be called for special files.
1067                    # there should be no information in the cache about special files processed in
1068                    # read-special mode, but we better play safe as this was wrong in the past:
1069                    hashed_path = path_hash = None
1070                    known, ids = False, None
1071                chunks = None
1072                if ids is not None:
1073                    # Make sure all ids are available
1074                    for id_ in ids:
1075                        if not cache.seen_chunk(id_):
1076                            status = 'M'  # cache said it is unmodified, but we lost a chunk: process file like modified
1077                            break
1078                    else:
1079                        chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
1080                        status = 'U'  # regular file, unchanged
1081                else:
1082                    status = 'M' if known else 'A'  # regular file, modified or added
1083                item.hardlink_master = hardlinked
1084                # Only chunkify the file if needed
1085                if chunks is not None:
1086                    item.chunks = chunks
1087                else:
1088                    with backup_io('open'):
1089                        fh = Archive._open_rb(path)
1090                    with os.fdopen(fh, 'rb') as fd:
1091                        self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
1092                    if not is_special_file:
1093                        # we must not memorize special files, because the contents of e.g. a
1094                        # block or char device will change without its mtime/size/inode changing.
1095                        cache.memorize_file(hashed_path, path_hash, st, [c.id for c in item.chunks])
1096                self.stats.nfiles += 1
1097            item.update(self.stat_ext_attrs(st, path))
1098            item.get_size(memorize=True)
1099            return status
1100
1101    @staticmethod
1102    def list_archives(repository, key, manifest, cache=None):
1103        # expensive! see also Manifest.list_archive_infos.
1104        for name in manifest.archives:
1105            yield Archive(repository, key, manifest, name, cache=cache)
1106
1107    @staticmethod
1108    def _open_rb(path):
1109        try:
1110            # if we have O_NOATIME, this likely will succeed if we are root or owner of file:
1111            return os.open(path, flags_noatime)
1112        except PermissionError:
1113            if flags_noatime == flags_normal:
1114                # we do not have O_NOATIME, no need to try again:
1115                raise
1116            # Was this EPERM due to the O_NOATIME flag? Try again without it:
1117            return os.open(path, flags_normal)
1118
1119
1120def valid_msgpacked_dict(d, keys_serialized):
1121    """check if the data <d> looks like a msgpacked dict"""
1122    d_len = len(d)
1123    if d_len == 0:
1124        return False
1125    if d[0] & 0xf0 == 0x80:  # object is a fixmap (up to 15 elements)
1126        offs = 1
1127    elif d[0] == 0xde:  # object is a map16 (up to 2^16-1 elements)
1128        offs = 3
1129    else:
1130        # object is not a map (dict)
1131        # note: we must not have dicts with > 2^16-1 elements
1132        return False
1133    if d_len <= offs:
1134        return False
1135    # is the first dict key a bytestring?
1136    if d[offs] & 0xe0 == 0xa0:  # key is a small bytestring (up to 31 chars)
1137        pass
1138    elif d[offs] in (0xd9, 0xda, 0xdb):  # key is a str8, str16 or str32
1139        pass
1140    else:
1141        # key is not a bytestring
1142        return False
1143    # is the bytestring any of the expected key names?
1144    key_serialized = d[offs:]
1145    return any(key_serialized.startswith(pattern) for pattern in keys_serialized)
1146
1147
1148class RobustUnpacker:
1149    """A restartable/robust version of the streaming msgpack unpacker
1150    """
1151    class UnpackerCrashed(Exception):
1152        """raise if unpacker crashed"""
1153
1154    def __init__(self, validator, item_keys):
1155        super().__init__()
1156        self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
1157        self.validator = validator
1158        self._buffered_data = []
1159        self._resync = False
1160        self._unpacker = msgpack.Unpacker(object_hook=StableDict)
1161
1162    def resync(self):
1163        self._buffered_data = []
1164        self._resync = True
1165
1166    def feed(self, data):
1167        if self._resync:
1168            self._buffered_data.append(data)
1169        else:
1170            self._unpacker.feed(data)
1171
1172    def __iter__(self):
1173        return self
1174
1175    def __next__(self):
1176        def unpack_next():
1177            try:
1178                return next(self._unpacker)
1179            except (TypeError, ValueError) as err:
1180                # transform exceptions that might be raised when feeding
1181                # msgpack with invalid data to a more specific exception
1182                raise self.UnpackerCrashed(str(err))
1183
1184        if self._resync:
1185            data = b''.join(self._buffered_data)
1186            while self._resync:
1187                if not data:
1188                    raise StopIteration
1189                # Abort early if the data does not look like a serialized item dict
1190                if not valid_msgpacked_dict(data, self.item_keys):
1191                    data = data[1:]
1192                    continue
1193                self._unpacker = msgpack.Unpacker(object_hook=StableDict)
1194                self._unpacker.feed(data)
1195                try:
1196                    item = unpack_next()
1197                except (self.UnpackerCrashed, StopIteration):
1198                    # as long as we are resyncing, we also ignore StopIteration
1199                    pass
1200                else:
1201                    if self.validator(item):
1202                        self._resync = False
1203                        return item
1204                data = data[1:]
1205        else:
1206            return unpack_next()
1207
1208
1209class ArchiveChecker:
1210
1211    def __init__(self):
1212        self.error_found = False
1213        self.possibly_superseded = set()
1214
1215    def check(self, repository, repair=False, archive=None, first=0, last=0, sort_by='', glob=None,
1216              verify_data=False, save_space=False):
1217        """Perform a set of checks on 'repository'
1218
1219        :param repair: enable repair mode, write updated or corrected data into repository
1220        :param archive: only check this archive
1221        :param first/last/sort_by: only check this number of first/last archives ordered by sort_by
1222        :param glob: only check archives matching this glob
1223        :param verify_data: integrity verification of data referenced by archives
1224        :param save_space: Repository.commit(save_space)
1225        """
1226        logger.info('Starting archive consistency check...')
1227        self.check_all = archive is None and not any((first, last, glob))
1228        self.repair = repair
1229        self.repository = repository
1230        self.init_chunks()
1231        if not self.chunks:
1232            logger.error('Repository contains no apparent data at all, cannot continue check/repair.')
1233            return False
1234        self.key = self.identify_key(repository)
1235        if verify_data:
1236            self.verify_data()
1237        if Manifest.MANIFEST_ID not in self.chunks:
1238            logger.error("Repository manifest not found!")
1239            self.error_found = True
1240            self.manifest = self.rebuild_manifest()
1241        else:
1242            try:
1243                self.manifest, _ = Manifest.load(repository, (Manifest.Operation.CHECK,), key=self.key)
1244            except IntegrityError as exc:
1245                logger.error('Repository manifest is corrupted: %s', exc)
1246                self.error_found = True
1247                del self.chunks[Manifest.MANIFEST_ID]
1248                self.manifest = self.rebuild_manifest()
1249        self.rebuild_refcounts(archive=archive, first=first, last=last, sort_by=sort_by, glob=glob)
1250        self.orphan_chunks_check()
1251        self.finish(save_space=save_space)
1252        if self.error_found:
1253            logger.error('Archive consistency check complete, problems found.')
1254        else:
1255            logger.info('Archive consistency check complete, no problems found.')
1256        return self.repair or not self.error_found
1257
1258    def init_chunks(self):
1259        """Fetch a list of all object keys from repository
1260        """
1261        # Explicitly set the initial hash table capacity to avoid performance issues
1262        # due to hash table "resonance".
1263        # Since reconstruction of archive items can add some new chunks, add 10 % headroom
1264        capacity = int(len(self.repository) / ChunkIndex.MAX_LOAD_FACTOR * 1.1)
1265        self.chunks = ChunkIndex(capacity)
1266        marker = None
1267        while True:
1268            result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
1269            if not result:
1270                break
1271            marker = result[-1]
1272            init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
1273            for id_ in result:
1274                self.chunks[id_] = init_entry
1275
1276    def identify_key(self, repository):
1277        try:
1278            some_chunkid, _ = next(self.chunks.iteritems())
1279        except StopIteration:
1280            # repo is completely empty, no chunks
1281            return None
1282        cdata = repository.get(some_chunkid)
1283        return key_factory(repository, cdata)
1284
1285    def verify_data(self):
1286        logger.info('Starting cryptographic data integrity verification...')
1287        chunks_count_index = len(self.chunks)
1288        chunks_count_segments = 0
1289        errors = 0
1290        defect_chunks = []
1291        pi = ProgressIndicatorPercent(total=chunks_count_index, msg="Verifying data %6.2f%%", step=0.01,
1292                                      msgid='check.verify_data')
1293        marker = None
1294        while True:
1295            chunk_ids = self.repository.scan(limit=100, marker=marker)
1296            if not chunk_ids:
1297                break
1298            chunks_count_segments += len(chunk_ids)
1299            marker = chunk_ids[-1]
1300            chunk_data_iter = self.repository.get_many(chunk_ids)
1301            chunk_ids_revd = list(reversed(chunk_ids))
1302            while chunk_ids_revd:
1303                pi.show()
1304                chunk_id = chunk_ids_revd.pop(-1)  # better efficiency
1305                try:
1306                    encrypted_data = next(chunk_data_iter)
1307                except (Repository.ObjectNotFound, IntegrityError) as err:
1308                    self.error_found = True
1309                    errors += 1
1310                    logger.error('chunk %s: %s', bin_to_hex(chunk_id), err)
1311                    if isinstance(err, IntegrityError):
1312                        defect_chunks.append(chunk_id)
1313                    # as the exception killed our generator, make a new one for remaining chunks:
1314                    if chunk_ids_revd:
1315                        chunk_ids = list(reversed(chunk_ids_revd))
1316                        chunk_data_iter = self.repository.get_many(chunk_ids)
1317                else:
1318                    _chunk_id = None if chunk_id == Manifest.MANIFEST_ID else chunk_id
1319                    try:
1320                        self.key.decrypt(_chunk_id, encrypted_data)
1321                    except IntegrityError as integrity_error:
1322                        self.error_found = True
1323                        errors += 1
1324                        logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
1325                        defect_chunks.append(chunk_id)
1326        pi.finish()
1327        if chunks_count_index != chunks_count_segments:
1328            logger.error('Repo/Chunks index object count vs. segment files object count mismatch.')
1329            logger.error('Repo/Chunks index: %d objects != segment files: %d objects',
1330                         chunks_count_index, chunks_count_segments)
1331        if defect_chunks:
1332            if self.repair:
1333                # if we kill the defect chunk here, subsequent actions within this "borg check"
1334                # run will find missing chunks and replace them with all-zero replacement
1335                # chunks and flag the files as "repaired".
1336                # if another backup is done later and the missing chunks get backupped again,
1337                # a "borg check" afterwards can heal all files where this chunk was missing.
1338                logger.warning('Found defect chunks. They will be deleted now, so affected files can '
1339                               'get repaired now and maybe healed later.')
1340                for defect_chunk in defect_chunks:
1341                    # remote repo (ssh): retry might help for strange network / NIC / RAM errors
1342                    # as the chunk will be retransmitted from remote server.
1343                    # local repo (fs): as chunks.iteritems loop usually pumps a lot of data through,
1344                    # a defect chunk is likely not in the fs cache any more and really gets re-read
1345                    # from the underlying media.
1346                    try:
1347                        encrypted_data = self.repository.get(defect_chunk)
1348                        _chunk_id = None if defect_chunk == Manifest.MANIFEST_ID else defect_chunk
1349                        self.key.decrypt(_chunk_id, encrypted_data)
1350                    except IntegrityError:
1351                        # failed twice -> get rid of this chunk
1352                        del self.chunks[defect_chunk]
1353                        self.repository.delete(defect_chunk)
1354                        logger.debug('chunk %s deleted.', bin_to_hex(defect_chunk))
1355                    else:
1356                        logger.warning('chunk %s not deleted, did not consistently fail.', bin_to_hex(defect_chunk))
1357            else:
1358                logger.warning('Found defect chunks. With --repair, they would get deleted, so affected '
1359                               'files could get repaired then and maybe healed later.')
1360                for defect_chunk in defect_chunks:
1361                    logger.debug('chunk %s is defect.', bin_to_hex(defect_chunk))
1362        log = logger.error if errors else logger.info
1363        log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.',
1364            chunks_count_segments, errors)
1365
1366    def rebuild_manifest(self):
1367        """Rebuild the manifest object if it is missing
1368
1369        Iterates through all objects in the repository looking for archive metadata blocks.
1370        """
1371        required_archive_keys = frozenset(key.encode() for key in REQUIRED_ARCHIVE_KEYS)
1372
1373        def valid_archive(obj):
1374            if not isinstance(obj, dict):
1375                return False
1376            keys = set(obj)
1377            return required_archive_keys.issubset(keys)
1378
1379        logger.info('Rebuilding missing manifest, this might take some time...')
1380        # as we have lost the manifest, we do not know any more what valid item keys we had.
1381        # collecting any key we encounter in a damaged repo seems unwise, thus we just use
1382        # the hardcoded list from the source code. thus, it is not recommended to rebuild a
1383        # lost manifest on a older borg version than the most recent one that was ever used
1384        # within this repository (assuming that newer borg versions support more item keys).
1385        manifest = Manifest(self.key, self.repository)
1386        archive_keys_serialized = [msgpack.packb(name.encode()) for name in ARCHIVE_KEYS]
1387        pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Rebuilding manifest %6.2f%%", step=0.01,
1388                                      msgid='check.rebuild_manifest')
1389        for chunk_id, _ in self.chunks.iteritems():
1390            pi.show()
1391            cdata = self.repository.get(chunk_id)
1392            try:
1393                data = self.key.decrypt(chunk_id, cdata)
1394            except IntegrityError as exc:
1395                logger.error('Skipping corrupted chunk: %s', exc)
1396                self.error_found = True
1397                continue
1398            if not valid_msgpacked_dict(data, archive_keys_serialized):
1399                continue
1400            if b'cmdline' not in data or b'\xa7version\x01' not in data:
1401                continue
1402            try:
1403                archive = msgpack.unpackb(data)
1404            # Ignore exceptions that might be raised when feeding
1405            # msgpack with invalid data
1406            except (TypeError, ValueError, StopIteration):
1407                continue
1408            if valid_archive(archive):
1409                archive = ArchiveItem(internal_dict=archive)
1410                name = archive.name
1411                logger.info('Found archive %s', name)
1412                if name in manifest.archives:
1413                    i = 1
1414                    while True:
1415                        new_name = '%s.%d' % (name, i)
1416                        if new_name not in manifest.archives:
1417                            break
1418                        i += 1
1419                    logger.warning('Duplicate archive name %s, storing as %s', name, new_name)
1420                    name = new_name
1421                manifest.archives[name] = (chunk_id, archive.time)
1422        pi.finish()
1423        logger.info('Manifest rebuild complete.')
1424        return manifest
1425
1426    def rebuild_refcounts(self, archive=None, first=0, last=0, sort_by='', glob=None):
1427        """Rebuild object reference counts by walking the metadata
1428
1429        Missing and/or incorrect data is repaired when detected
1430        """
1431        # Exclude the manifest from chunks (manifest entry might be already deleted from self.chunks)
1432        self.chunks.pop(Manifest.MANIFEST_ID, None)
1433
1434        def mark_as_possibly_superseded(id_):
1435            if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
1436                self.possibly_superseded.add(id_)
1437
1438        def add_callback(chunk):
1439            id_ = self.key.id_hash(chunk)
1440            cdata = self.key.encrypt(chunk)
1441            add_reference(id_, len(chunk), len(cdata), cdata)
1442            return id_
1443
1444        def add_reference(id_, size, csize, cdata=None):
1445            try:
1446                self.chunks.incref(id_)
1447            except KeyError:
1448                assert cdata is not None
1449                self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
1450                if self.repair:
1451                    self.repository.put(id_, cdata)
1452
1453        def verify_file_chunks(archive_name, item):
1454            """Verifies that all file chunks are present.
1455
1456            Missing file chunks will be replaced with new chunks of the same length containing all zeros.
1457            If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one.
1458            """
1459            def replacement_chunk(size):
1460                data = bytes(size)
1461                chunk_id = self.key.id_hash(data)
1462                cdata = self.key.encrypt(data)
1463                csize = len(cdata)
1464                return chunk_id, size, csize, cdata
1465
1466            offset = 0
1467            chunk_list = []
1468            chunks_replaced = False
1469            has_chunks_healthy = 'chunks_healthy' in item
1470            chunks_current = item.chunks
1471            chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current
1472            if has_chunks_healthy and len(chunks_current) != len(chunks_healthy):
1473                # should never happen, but there was issue #3218.
1474                logger.warning('{}: {}: Invalid chunks_healthy metadata removed!'.format(archive_name, item.path))
1475                del item.chunks_healthy
1476                has_chunks_healthy = False
1477                chunks_healthy = chunks_current
1478            for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy):
1479                chunk_id, size, csize = chunk_healthy
1480                if chunk_id not in self.chunks:
1481                    # a chunk of the healthy list is missing
1482                    if chunk_current == chunk_healthy:
1483                        logger.error('{}: {}: New missing file chunk detected (Byte {}-{}, Chunk {}). '
1484                                     'Replacing with all-zero chunk.'.format(
1485                                     archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
1486                        self.error_found = chunks_replaced = True
1487                        chunk_id, size, csize, cdata = replacement_chunk(size)
1488                        add_reference(chunk_id, size, csize, cdata)
1489                    else:
1490                        logger.info('{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). '
1491                                    'It has an all-zero replacement chunk already.'.format(
1492                                    archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
1493                        chunk_id, size, csize = chunk_current
1494                        if chunk_id in self.chunks:
1495                            add_reference(chunk_id, size, csize)
1496                        else:
1497                            logger.warning('{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}, Chunk {}). '
1498                                           'Generating new replacement chunk.'.format(
1499                                           archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
1500                            self.error_found = chunks_replaced = True
1501                            chunk_id, size, csize, cdata = replacement_chunk(size)
1502                            add_reference(chunk_id, size, csize, cdata)
1503                else:
1504                    if chunk_current == chunk_healthy:
1505                        # normal case, all fine.
1506                        add_reference(chunk_id, size, csize)
1507                    else:
1508                        logger.info('{}: {}: Healed previously missing file chunk! (Byte {}-{}, Chunk {}).'.format(
1509                            archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id)))
1510                        add_reference(chunk_id, size, csize)
1511                        mark_as_possibly_superseded(chunk_current[0])  # maybe orphaned the all-zero replacement chunk
1512                chunk_list.append([chunk_id, size, csize])  # list-typed element as chunks_healthy is list-of-lists
1513                offset += size
1514            if chunks_replaced and not has_chunks_healthy:
1515                # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later
1516                item.chunks_healthy = item.chunks
1517            if has_chunks_healthy and chunk_list == chunks_healthy:
1518                logger.info('{}: {}: Completely healed previously damaged file!'.format(archive_name, item.path))
1519                del item.chunks_healthy
1520            item.chunks = chunk_list
1521            if 'size' in item:
1522                item_size = item.size
1523                item_chunks_size = item.get_size(compressed=False, from_chunks=True)
1524                if item_size != item_chunks_size:
1525                    # just warn, but keep the inconsistency, so that borg extract can warn about it.
1526                    logger.warning('{}: {}: size inconsistency detected: size {}, chunks size {}'.format(
1527                                   archive_name, item.path, item_size, item_chunks_size))
1528
1529        def robust_iterator(archive):
1530            """Iterates through all archive items
1531
1532            Missing item chunks will be skipped and the msgpack stream will be restarted
1533            """
1534            item_keys = frozenset(key.encode() for key in self.manifest.item_keys)
1535            required_item_keys = frozenset(key.encode() for key in REQUIRED_ITEM_KEYS)
1536            unpacker = RobustUnpacker(lambda item: isinstance(item, StableDict) and b'path' in item,
1537                                      self.manifest.item_keys)
1538            _state = 0
1539
1540            def missing_chunk_detector(chunk_id):
1541                nonlocal _state
1542                if _state % 2 != int(chunk_id not in self.chunks):
1543                    _state += 1
1544                return _state
1545
1546            def report(msg, chunk_id, chunk_no):
1547                cid = bin_to_hex(chunk_id)
1548                msg += ' [chunk: %06d_%s]' % (chunk_no, cid)  # see "debug dump-archive-items"
1549                self.error_found = True
1550                logger.error(msg)
1551
1552            def list_keys_safe(keys):
1553                return ', '.join((k.decode(errors='replace') if isinstance(k, bytes) else str(k) for k in keys))
1554
1555            def valid_item(obj):
1556                if not isinstance(obj, StableDict):
1557                    return False, 'not a dictionary'
1558                # A bug in Attic up to and including release 0.13 added a (meaningless) b'acl' key to every item.
1559                # We ignore it here, should it exist. See test_attic013_acl_bug for details.
1560                obj.pop(b'acl', None)
1561                keys = set(obj)
1562                if not required_item_keys.issubset(keys):
1563                    return False, 'missing required keys: ' + list_keys_safe(required_item_keys - keys)
1564                if not keys.issubset(item_keys):
1565                    return False, 'invalid keys: ' + list_keys_safe(keys - item_keys)
1566                return True, ''
1567
1568            i = 0
1569            for state, items in groupby(archive.items, missing_chunk_detector):
1570                items = list(items)
1571                if state % 2:
1572                    for chunk_id in items:
1573                        report('item metadata chunk missing', chunk_id, i)
1574                        i += 1
1575                    continue
1576                if state > 0:
1577                    unpacker.resync()
1578                for chunk_id, cdata in zip(items, repository.get_many(items)):
1579                    data = self.key.decrypt(chunk_id, cdata)
1580                    unpacker.feed(data)
1581                    try:
1582                        for item in unpacker:
1583                            valid, reason = valid_item(item)
1584                            if valid:
1585                                yield Item(internal_dict=item)
1586                            else:
1587                                report('Did not get expected metadata dict when unpacking item metadata (%s)' % reason, chunk_id, i)
1588                    except RobustUnpacker.UnpackerCrashed:
1589                        report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
1590                        unpacker.resync()
1591                    except Exception:
1592                        report('Exception while unpacking item metadata', chunk_id, i)
1593                        raise
1594                    i += 1
1595
1596        if archive is None:
1597            sort_by = sort_by.split(',')
1598            if any((first, last, glob)):
1599                archive_infos = self.manifest.archives.list(sort_by=sort_by, glob=glob, first=first, last=last)
1600                if glob and not archive_infos:
1601                    logger.warning('--glob-archives %s does not match any archives', glob)
1602                if first and len(archive_infos) < first:
1603                    logger.warning('--first %d archives: only found %d archives', first, len(archive_infos))
1604                if last and len(archive_infos) < last:
1605                    logger.warning('--last %d archives: only found %d archives', last, len(archive_infos))
1606            else:
1607                archive_infos = self.manifest.archives.list(sort_by=sort_by)
1608        else:
1609            # we only want one specific archive
1610            try:
1611                archive_infos = [self.manifest.archives[archive]]
1612            except KeyError:
1613                logger.error("Archive '%s' not found.", archive)
1614                self.error_found = True
1615                return
1616        num_archives = len(archive_infos)
1617
1618        pi = ProgressIndicatorPercent(total=num_archives, msg='Checking archives %3.1f%%', step=0.1,
1619                                      msgid='check.rebuild_refcounts')
1620        with cache_if_remote(self.repository) as repository:
1621            for i, info in enumerate(archive_infos):
1622                pi.show(i)
1623                logger.info('Analyzing archive {} ({}/{})'.format(info.name, i + 1, num_archives))
1624                archive_id = info.id
1625                if archive_id not in self.chunks:
1626                    logger.error('Archive metadata block is missing!')
1627                    self.error_found = True
1628                    del self.manifest.archives[info.name]
1629                    continue
1630                mark_as_possibly_superseded(archive_id)
1631                cdata = self.repository.get(archive_id)
1632                data = self.key.decrypt(archive_id, cdata)
1633                archive = ArchiveItem(internal_dict=msgpack.unpackb(data))
1634                if archive.version != 1:
1635                    raise Exception('Unknown archive metadata version')
1636                archive.cmdline = [safe_decode(arg) for arg in archive.cmdline]
1637                items_buffer = ChunkBuffer(self.key)
1638                items_buffer.write_chunk = add_callback
1639                for item in robust_iterator(archive):
1640                    if 'chunks' in item:
1641                        verify_file_chunks(info.name, item)
1642                    items_buffer.add(item)
1643                items_buffer.flush(flush=True)
1644                for previous_item_id in archive.items:
1645                    mark_as_possibly_superseded(previous_item_id)
1646                archive.items = items_buffer.chunks
1647                data = msgpack.packb(archive.as_dict(), unicode_errors='surrogateescape')
1648                new_archive_id = self.key.id_hash(data)
1649                cdata = self.key.encrypt(data)
1650                add_reference(new_archive_id, len(data), len(cdata), cdata)
1651                self.manifest.archives[info.name] = (new_archive_id, info.ts)
1652            pi.finish()
1653
1654    def orphan_chunks_check(self):
1655        if self.check_all:
1656            unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
1657            orphaned = unused - self.possibly_superseded
1658            if orphaned:
1659                logger.error('{} orphaned objects found!'.format(len(orphaned)))
1660                self.error_found = True
1661            if self.repair and unused:
1662                logger.info('Deleting %d orphaned and %d superseded objects...' % (
1663                    len(orphaned), len(self.possibly_superseded)))
1664                for id_ in unused:
1665                    self.repository.delete(id_)
1666                logger.info('Finished deleting orphaned/superseded objects.')
1667        else:
1668            logger.info('Orphaned objects check skipped (needs all archives checked).')
1669
1670    def finish(self, save_space=False):
1671        if self.repair:
1672            logger.info('Writing Manifest.')
1673            self.manifest.write()
1674            logger.info('Committing repo (may take a while, due to compact_segments)...')
1675            self.repository.commit(save_space=save_space)
1676            logger.info('Finished committing repo.')
1677
1678
1679class ArchiveRecreater:
1680    class Interrupted(Exception):
1681        def __init__(self, metadata=None):
1682            self.metadata = metadata or {}
1683
1684    @staticmethod
1685    def is_temporary_archive(archive_name):
1686        return archive_name.endswith('.recreate')
1687
1688    def __init__(self, repository, manifest, key, cache, matcher,
1689                 exclude_caches=False, exclude_if_present=None, keep_exclude_tags=False,
1690                 chunker_params=None, compression=None, recompress=False, always_recompress=False,
1691                 dry_run=False, stats=False, progress=False, file_status_printer=None,
1692                 timestamp=None, checkpoint_interval=1800):
1693        self.repository = repository
1694        self.key = key
1695        self.manifest = manifest
1696        self.cache = cache
1697
1698        self.matcher = matcher
1699        self.exclude_caches = exclude_caches
1700        self.exclude_if_present = exclude_if_present or []
1701        self.keep_exclude_tags = keep_exclude_tags
1702
1703        self.rechunkify = chunker_params is not None
1704        if self.rechunkify:
1705            logger.debug('Rechunking archives to %s', chunker_params)
1706        self.chunker_params = chunker_params or CHUNKER_PARAMS
1707        self.recompress = recompress
1708        self.always_recompress = always_recompress
1709        self.compression = compression or CompressionSpec('none')
1710        self.seen_chunks = set()
1711
1712        self.timestamp = timestamp
1713        self.dry_run = dry_run
1714        self.stats = stats
1715        self.progress = progress
1716        self.print_file_status = file_status_printer or (lambda *args: None)
1717        self.checkpoint_interval = None if dry_run else checkpoint_interval
1718
1719    def recreate(self, archive_name, comment=None, target_name=None):
1720        assert not self.is_temporary_archive(archive_name)
1721        archive = self.open_archive(archive_name)
1722        target = self.create_target(archive, target_name)
1723        if self.exclude_if_present or self.exclude_caches:
1724            self.matcher_add_tagged_dirs(archive)
1725        if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
1726            return False
1727        self.process_items(archive, target)
1728        replace_original = target_name is None
1729        self.save(archive, target, comment, replace_original=replace_original)
1730        return True
1731
1732    def process_items(self, archive, target):
1733        matcher = self.matcher
1734        target_is_subset = not matcher.empty()
1735        hardlink_masters = {} if target_is_subset else None
1736
1737        def item_is_hardlink_master(item):
1738            return (target_is_subset and
1739                    hardlinkable(item.mode) and
1740                    item.get('hardlink_master', True) and
1741                    'source' not in item)
1742
1743        for item in archive.iter_items():
1744            if not matcher.match(item.path):
1745                self.print_file_status('x', item.path)
1746                if item_is_hardlink_master(item):
1747                    hardlink_masters[item.path] = (item.get('chunks'), item.get('chunks_healthy'), None)
1748                continue
1749            if target_is_subset and hardlinkable(item.mode) and item.get('source') in hardlink_masters:
1750                # master of this hard link is outside the target subset
1751                chunks, chunks_healthy, new_source = hardlink_masters[item.source]
1752                if new_source is None:
1753                    # First item to use this master, move the chunks
1754                    item.chunks = chunks
1755                    if chunks_healthy is not None:
1756                        item.chunks_healthy = chunks_healthy
1757                    hardlink_masters[item.source] = (None, None, item.path)
1758                    del item.source
1759                else:
1760                    # Master was already moved, only update this item's source
1761                    item.source = new_source
1762            if self.dry_run:
1763                self.print_file_status('-', item.path)
1764            else:
1765                self.process_item(archive, target, item)
1766        if self.progress:
1767            target.stats.show_progress(final=True)
1768
1769    def process_item(self, archive, target, item):
1770        if 'chunks' in item:
1771            self.process_chunks(archive, target, item)
1772            target.stats.nfiles += 1
1773        target.add_item(item)
1774        self.print_file_status(file_status(item.mode), item.path)
1775
1776    def process_chunks(self, archive, target, item):
1777        if not self.recompress and not target.recreate_rechunkify:
1778            for chunk_id, size, csize in item.chunks:
1779                self.cache.chunk_incref(chunk_id, target.stats)
1780            return item.chunks
1781        chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
1782        chunk_processor = partial(self.chunk_processor, target)
1783        target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
1784
1785    def chunk_processor(self, target, data):
1786        chunk_id = self.key.id_hash(data)
1787        if chunk_id in self.seen_chunks:
1788            return self.cache.chunk_incref(chunk_id, target.stats)
1789        overwrite = self.recompress
1790        if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
1791            # Check if this chunk is already compressed the way we want it
1792            old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False)
1793            if Compressor.detect(old_chunk).name == self.key.compressor.decide(data).name:
1794                # Stored chunk has the same compression we wanted
1795                overwrite = False
1796        chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
1797        self.cache.repository.async_response(wait=False)
1798        self.seen_chunks.add(chunk_entry.id)
1799        return chunk_entry
1800
1801    def iter_chunks(self, archive, target, chunks):
1802        chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks])
1803        if target.recreate_rechunkify:
1804            # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
1805            # (does not load the entire file into memory)
1806            file = ChunkIteratorFileWrapper(chunk_iterator)
1807            yield from target.chunker.chunkify(file)
1808        else:
1809            for chunk in chunk_iterator:
1810                yield chunk
1811
1812    def save(self, archive, target, comment=None, replace_original=True):
1813        if self.dry_run:
1814            return
1815        if comment is None:
1816            comment = archive.metadata.get('comment', '')
1817
1818        # Keep for the statistics if necessary
1819        if self.stats:
1820            _start = target.start
1821
1822        if self.timestamp is None:
1823            additional_metadata = {
1824                'time': archive.metadata.time,
1825                'time_end': archive.metadata.get('time_end') or archive.metadata.time,
1826                'cmdline': archive.metadata.cmdline,
1827                # but also remember recreate metadata:
1828                'recreate_cmdline': sys.argv,
1829            }
1830        else:
1831            additional_metadata = {
1832                'cmdline': archive.metadata.cmdline,
1833                # but also remember recreate metadata:
1834                'recreate_cmdline': sys.argv,
1835            }
1836
1837        target.save(comment=comment, timestamp=self.timestamp,
1838                    additional_metadata=additional_metadata)
1839        if replace_original:
1840            archive.delete(Statistics(), progress=self.progress)
1841            target.rename(archive.name)
1842        if self.stats:
1843            target.start = _start
1844            target.end = datetime.utcnow()
1845            log_multi(DASHES,
1846                      str(target),
1847                      DASHES,
1848                      str(target.stats),
1849                      str(self.cache),
1850                      DASHES)
1851
1852    def matcher_add_tagged_dirs(self, archive):
1853        """Add excludes to the matcher created by exclude_cache and exclude_if_present."""
1854        def exclude(dir, tag_item):
1855            if self.keep_exclude_tags:
1856                tag_files.append(PathPrefixPattern(tag_item.path, recurse_dir=False))
1857                tagged_dirs.append(FnmatchPattern(dir + '/', recurse_dir=False))
1858            else:
1859                tagged_dirs.append(PathPrefixPattern(dir, recurse_dir=False))
1860
1861        matcher = self.matcher
1862        tag_files = []
1863        tagged_dirs = []
1864
1865        # to support reading hard-linked CACHEDIR.TAGs (aka CACHE_TAG_NAME), similar to hardlink_masters:
1866        cachedir_masters = {}
1867
1868        if self.exclude_caches:
1869            # sadly, due to how CACHEDIR.TAG works (filename AND file [header] contents) and
1870            # how borg deals with hardlinks (slave hardlinks referring back to master hardlinks),
1871            # we need to pass over the archive collecting hardlink master paths.
1872            # as seen in issue #4911, the master paths can have an arbitrary filenames,
1873            # not just CACHEDIR.TAG.
1874            for item in archive.iter_items(filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME):
1875                if stat.S_ISREG(item.mode) and 'chunks' not in item and 'source' in item:
1876                    # this is a hardlink slave, referring back to its hardlink master (via item.source)
1877                    cachedir_masters[item.source] = None  # we know the key (path), but not the value (item) yet
1878
1879        for item in archive.iter_items(
1880                filter=lambda item: os.path.basename(item.path) == CACHE_TAG_NAME or matcher.match(item.path)):
1881            if self.exclude_caches and item.path in cachedir_masters:
1882                cachedir_masters[item.path] = item
1883            dir, tag_file = os.path.split(item.path)
1884            if tag_file in self.exclude_if_present:
1885                exclude(dir, item)
1886            elif self.exclude_caches and tag_file == CACHE_TAG_NAME and stat.S_ISREG(item.mode):
1887                content_item = item if 'chunks' in item else cachedir_masters[item.source]
1888                file = open_item(archive, content_item)
1889                if file.read(len(CACHE_TAG_CONTENTS)) == CACHE_TAG_CONTENTS:
1890                    exclude(dir, item)
1891        matcher.add(tag_files, IECommand.Include)
1892        matcher.add(tagged_dirs, IECommand.ExcludeNoRecurse)
1893
1894    def create_target(self, archive, target_name=None):
1895        """Create target archive."""
1896        target_name = target_name or archive.name + '.recreate'
1897        target = self.create_target_archive(target_name)
1898        # If the archives use the same chunker params, then don't rechunkify
1899        source_chunker_params = tuple(archive.metadata.get('chunker_params', []))
1900        target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
1901        if target.recreate_rechunkify:
1902            logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
1903        return target
1904
1905    def create_target_archive(self, name):
1906        target = Archive(self.repository, self.key, self.manifest, name, create=True,
1907                          progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
1908                          checkpoint_interval=self.checkpoint_interval)
1909        return target
1910
1911    def open_archive(self, name, **kwargs):
1912        return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)
1913