1import errno
2import mmap
3import os
4import shutil
5import stat
6import struct
7import time
8from binascii import hexlify, unhexlify
9from collections import defaultdict
10from configparser import ConfigParser
11from datetime import datetime
12from functools import partial
13from itertools import islice
14
15from .constants import *  # NOQA
16from .hashindex import NSIndex
17from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
18from .helpers import Location
19from .helpers import ProgressIndicatorPercent
20from .helpers import bin_to_hex
21from .helpers import hostname_is_unique
22from .helpers import secure_erase, truncate_and_unlink
23from .helpers import msgpack
24from .locking import Lock, LockError, LockErrorT
25from .logger import create_logger
26from .lrucache import LRUCache
27from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
28from .algorithms.checksums import crc32
29from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
30
31logger = create_logger(__name__)
32
33MAGIC = b'BORG_SEG'
34MAGIC_LEN = len(MAGIC)
35ATTIC_MAGIC = b'ATTICSEG'
36assert len(ATTIC_MAGIC) == MAGIC_LEN
37TAG_PUT = 0
38TAG_DELETE = 1
39TAG_COMMIT = 2
40
41FreeSpace = partial(defaultdict, int)
42
43
44class Repository:
45    """
46    Filesystem based transactional key value store
47
48    Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
49    called segments. Each segment is a series of log entries. The segment number together with the offset of each
50    entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
51    time for the purposes of the log.
52
53    Log entries are either PUT, DELETE or COMMIT.
54
55    A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
56    segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
57    is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
58
59    When reading from a repository it is first checked whether the last segment is committed. If it is not, then
60    all segments after the last committed segment are deleted; they contain log entries whose consistency is not
61    established by a COMMIT.
62
63    Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
64    the platform (including the hardware). See platform.base.SyncFile for details.
65
66    A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
67    full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
68
69    A DELETE marks a key as deleted.
70
71    For a given key only the last entry regarding the key, which is called current (all other entries are called
72    superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
73    Otherwise the last PUT defines the value of the key.
74
75    By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
76    such obsolete entries is called sparse, while a segment containing no such entries is called compact.
77
78    Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
79    superseded entries where current.
80
81    On disk layout:
82
83    dir/README
84    dir/config
85    dir/data/<X // SEGMENTS_PER_DIR>/<X>
86    dir/index.X
87    dir/hints.X
88
89    File system interaction
90    -----------------------
91
92    LoggedIO generally tries to rely on common behaviours across transactional file systems.
93
94    Segments that are deleted are truncated first, which avoids problems if the FS needs to
95    allocate space to delete the dirent of the segment. This mostly affects CoW file systems,
96    traditional journaling file systems have a fairly good grip on this problem.
97
98    Note that deletion, i.e. unlink(2), is atomic on every file system that uses inode reference
99    counts, which includes pretty much all of them. To remove a dirent the inodes refcount has
100    to be decreased, but you can't decrease the refcount before removing the dirent nor can you
101    decrease the refcount after removing the dirent. File systems solve this with a lock,
102    and by ensuring it all stays within the same FS transaction.
103
104    Truncation is generally not atomic in itself, and combining truncate(2) and unlink(2) is of
105    course never guaranteed to be atomic. Truncation in a classic extent-based FS is done in
106    roughly two phases, first the extents are removed then the inode is updated. (In practice
107    this is of course way more complex).
108
109    LoggedIO gracefully handles truncate/unlink splits as long as the truncate resulted in
110    a zero length file. Zero length segments are considered to not exist, while LoggedIO.cleanup()
111    will still get rid of them.
112    """
113
114    class DoesNotExist(Error):
115        """Repository {} does not exist."""
116
117    class AlreadyExists(Error):
118        """A repository already exists at {}."""
119
120    class PathAlreadyExists(Error):
121        """There is already something at {}."""
122
123    class ParentPathDoesNotExist(Error):
124        """The parent path of the repo directory [{}] does not exist."""
125
126    class InvalidRepository(Error):
127        """{} is not a valid repository. Check repo config."""
128
129    class InvalidRepositoryConfig(Error):
130        """{} does not have a valid configuration. Check repo config [{}]."""
131
132    class AtticRepository(Error):
133        """Attic repository detected. Please run "borg upgrade {}"."""
134
135    class CheckNeeded(ErrorWithTraceback):
136        """Inconsistency detected. Please run "borg check {}"."""
137
138    class ObjectNotFound(ErrorWithTraceback):
139        """Object with key {} not found in repository {}."""
140
141        def __init__(self, id, repo):
142            if isinstance(id, bytes):
143                id = bin_to_hex(id)
144            super().__init__(id, repo)
145
146    class InsufficientFreeSpaceError(Error):
147        """Insufficient free space to complete transaction (required: {}, available: {})."""
148
149    class StorageQuotaExceeded(Error):
150        """The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
151
152    def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True,
153                 append_only=False, storage_quota=None, check_segment_magic=True,
154                 make_parent_dirs=False):
155        self.path = os.path.abspath(path)
156        self._location = Location('file://%s' % self.path)
157        self.io = None  # type: LoggedIO
158        self.lock = None
159        self.index = None
160        # This is an index of shadowed log entries during this transaction. Consider the following sequence:
161        # segment_n PUT A, segment_x DELETE A
162        # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]".
163        self.shadow_index = {}
164        self._active_txn = False
165        self.lock_wait = lock_wait
166        self.do_lock = lock
167        self.do_create = create
168        self.created = False
169        self.exclusive = exclusive
170        self.append_only = append_only
171        self.storage_quota = storage_quota
172        self.storage_quota_use = 0
173        self.transaction_doomed = None
174        self.check_segment_magic = check_segment_magic
175        self.make_parent_dirs = make_parent_dirs
176
177    def __del__(self):
178        if self.lock:
179            self.close()
180            assert False, "cleanup happened in Repository.__del__"
181
182    def __repr__(self):
183        return '<%s %s>' % (self.__class__.__name__, self.path)
184
185    def __enter__(self):
186        if self.do_create:
187            self.do_create = False
188            self.create(self.path)
189            self.created = True
190        self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
191        return self
192
193    def __exit__(self, exc_type, exc_val, exc_tb):
194        if exc_type is not None:
195            no_space_left_on_device = exc_type is OSError and exc_val.errno == errno.ENOSPC
196            # The ENOSPC could have originated somewhere else besides the Repository. The cleanup is always safe, unless
197            # EIO or FS corruption ensues, which is why we specifically check for ENOSPC.
198            if self._active_txn and no_space_left_on_device:
199                logger.warning('No space left on device, cleaning up partial transaction to free space.')
200                cleanup = True
201            else:
202                cleanup = False
203            self._rollback(cleanup=cleanup)
204        self.close()
205
206    @property
207    def id_str(self):
208        return bin_to_hex(self.id)
209
210    @staticmethod
211    def is_repository(path):
212        """Check whether there is already a Borg repository at *path*."""
213        try:
214            # Use binary mode to avoid troubles if a README contains some stuff not in our locale
215            with open(os.path.join(path, 'README'), 'rb') as fd:
216                # Read only the first ~100 bytes (if any), in case some README file we stumble upon is large.
217                readme_head = fd.read(100)
218                # The first comparison captures our current variant (REPOSITORY_README), the second comparison
219                # is an older variant of the README file (used by 1.0.x).
220                return b'Borg Backup repository' in readme_head or b'Borg repository' in readme_head
221        except OSError:
222            # Ignore FileNotFound, PermissionError, ...
223            return False
224
225    def check_can_create_repository(self, path):
226        """
227        Raise an exception if a repository already exists at *path* or any parent directory.
228
229        Checking parent directories is done for two reasons:
230        (1) It's just a weird thing to do, and usually not intended. A Borg using the "parent" repository
231            may be confused, or we may accidentally put stuff into the "data/" or "data/<n>/" directories.
232        (2) When implementing repository quotas (which we currently don't), it's important to prohibit
233            folks from creating quota-free repositories. Since no one can create a repository within another
234            repository, user's can only use the quota'd repository, when their --restrict-to-path points
235            at the user's repository.
236        """
237        try:
238            st = os.stat(path)
239        except FileNotFoundError:
240            pass  # nothing there!
241        else:
242            # there is something already there!
243            if self.is_repository(path):
244                raise self.AlreadyExists(path)
245            if not stat.S_ISDIR(st.st_mode) or os.listdir(path):
246                raise self.PathAlreadyExists(path)
247            # an empty directory is acceptable for us.
248
249        while True:
250            # Check all parent directories for Borg's repository README
251            previous_path = path
252            # Thus, path = previous_path/..
253            path = os.path.abspath(os.path.join(previous_path, os.pardir))
254            if path == previous_path:
255                # We reached the root of the directory hierarchy (/.. = / and C:\.. = C:\).
256                break
257            if self.is_repository(path):
258                raise self.AlreadyExists(path)
259
260    def create(self, path):
261        """Create a new empty repository at `path`
262        """
263        self.check_can_create_repository(path)
264        if self.make_parent_dirs:
265            parent_path = os.path.join(path, os.pardir)
266            os.makedirs(parent_path, exist_ok=True)
267        if not os.path.exists(path):
268            try:
269                os.mkdir(path)
270            except FileNotFoundError as err:
271                raise self.ParentPathDoesNotExist(path) from err
272        with open(os.path.join(path, 'README'), 'w') as fd:
273            fd.write(REPOSITORY_README)
274        os.mkdir(os.path.join(path, 'data'))
275        config = ConfigParser(interpolation=None)
276        config.add_section('repository')
277        config.set('repository', 'version', '1')
278        config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
279        config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
280        config.set('repository', 'append_only', str(int(self.append_only)))
281        if self.storage_quota:
282            config.set('repository', 'storage_quota', str(self.storage_quota))
283        else:
284            config.set('repository', 'storage_quota', '0')
285        config.set('repository', 'additional_free_space', '0')
286        config.set('repository', 'id', bin_to_hex(os.urandom(32)))
287        self.save_config(path, config)
288
289    def save_config(self, path, config):
290        config_path = os.path.join(path, 'config')
291        old_config_path = os.path.join(path, 'config.old')
292
293        if os.path.isfile(old_config_path):
294            logger.warning("Old config file not securely erased on previous config update")
295            secure_erase(old_config_path)
296
297        if os.path.isfile(config_path):
298            link_error_msg = ("Failed to securely erase old repository config file (hardlinks not supported>). "
299                              "Old repokey data, if any, might persist on physical storage.")
300            try:
301                os.link(config_path, old_config_path)
302            except OSError as e:
303                if e.errno in (errno.EMLINK, errno.ENOSYS, errno.EPERM, errno.EACCES, errno.ENOTSUP, errno.EIO):
304                    logger.warning(link_error_msg)
305                else:
306                    raise
307            except AttributeError:
308                # some python ports have no os.link, see #4901
309                logger.warning(link_error_msg)
310
311        try:
312            with SaveFile(config_path) as fd:
313                config.write(fd)
314        except PermissionError as e:
315            # error is only a problem if we even had a lock
316            if self.do_lock:
317                raise
318            logger.warning("%s: Failed writing to '%s'. This is expected when working on "
319                           "read-only repositories." % (e.strerror, e.filename))
320
321        if os.path.isfile(old_config_path):
322            secure_erase(old_config_path)
323
324    def save_key(self, keydata):
325        assert self.config
326        keydata = keydata.decode('utf-8')  # remote repo: msgpack issue #99, getting bytes
327        self.config.set('repository', 'key', keydata)
328        self.save_config(self.path, self.config)
329
330    def load_key(self):
331        keydata = self.config.get('repository', 'key')
332        return keydata.encode('utf-8')  # remote repo: msgpack issue #99, returning bytes
333
334    def get_free_nonce(self):
335        if self.do_lock and not self.lock.got_exclusive_lock():
336            raise AssertionError("bug in code, exclusive lock should exist here")
337
338        nonce_path = os.path.join(self.path, 'nonce')
339        try:
340            with open(nonce_path, 'r') as fd:
341                return int.from_bytes(unhexlify(fd.read()), byteorder='big')
342        except FileNotFoundError:
343            return None
344
345    def commit_nonce_reservation(self, next_unreserved, start_nonce):
346        if self.do_lock and not self.lock.got_exclusive_lock():
347            raise AssertionError("bug in code, exclusive lock should exist here")
348
349        if self.get_free_nonce() != start_nonce:
350            raise Exception("nonce space reservation with mismatched previous state")
351        nonce_path = os.path.join(self.path, 'nonce')
352        try:
353            with SaveFile(nonce_path, binary=False) as fd:
354                fd.write(bin_to_hex(next_unreserved.to_bytes(8, byteorder='big')))
355        except PermissionError as e:
356            # error is only a problem if we even had a lock
357            if self.do_lock:
358                raise
359            logger.warning("%s: Failed writing to '%s'. This is expected when working on "
360                           "read-only repositories." % (e.strerror, e.filename))
361
362    def destroy(self):
363        """Destroy the repository at `self.path`
364        """
365        if self.append_only:
366            raise ValueError(self.path + " is in append-only mode")
367        self.close()
368        os.remove(os.path.join(self.path, 'config'))  # kill config first
369        shutil.rmtree(self.path)
370
371    def get_index_transaction_id(self):
372        indices = sorted(int(fn[6:])
373                         for fn in os.listdir(self.path)
374                         if fn.startswith('index.') and fn[6:].isdigit() and os.stat(os.path.join(self.path, fn)).st_size != 0)
375        if indices:
376            return indices[-1]
377        else:
378            return None
379
380    def check_transaction(self):
381        index_transaction_id = self.get_index_transaction_id()
382        segments_transaction_id = self.io.get_segments_transaction_id()
383        if index_transaction_id is not None and segments_transaction_id is None:
384            # we have a transaction id from the index, but we did not find *any*
385            # commit in the segment files (thus no segments transaction id).
386            # this can happen if a lot of segment files are lost, e.g. due to a
387            # filesystem or hardware malfunction. it means we have no identifiable
388            # valid (committed) state of the repo which we could use.
389            msg = '%s" - although likely this is "beyond repair' % self.path  # dirty hack
390            raise self.CheckNeeded(msg)
391        # Attempt to automatically rebuild index if we crashed between commit
392        # tag write and index save
393        if index_transaction_id != segments_transaction_id:
394            if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
395                replay_from = None
396            else:
397                replay_from = index_transaction_id
398            self.replay_segments(replay_from, segments_transaction_id)
399
400    def get_transaction_id(self):
401        self.check_transaction()
402        return self.get_index_transaction_id()
403
404    def break_lock(self):
405        Lock(os.path.join(self.path, 'lock')).break_lock()
406
407    def migrate_lock(self, old_id, new_id):
408        # note: only needed for local repos
409        if self.lock is not None:
410            self.lock.migrate_lock(old_id, new_id)
411
412    def open(self, path, exclusive, lock_wait=None, lock=True):
413        self.path = path
414        try:
415            st = os.stat(path)
416        except FileNotFoundError:
417            raise self.DoesNotExist(path)
418        if not stat.S_ISDIR(st.st_mode):
419            raise self.InvalidRepository(path)
420        if lock:
421            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=hostname_is_unique()).acquire()
422        else:
423            self.lock = None
424        self.config = ConfigParser(interpolation=None)
425        try:
426            with open(os.path.join(self.path, 'config')) as fd:
427                self.config.read_file(fd)
428        except FileNotFoundError:
429            self.close()
430            raise self.InvalidRepository(self.path)
431        if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
432            self.close()
433            raise self.InvalidRepository(path)
434        self.max_segment_size = parse_file_size(self.config.get('repository', 'max_segment_size'))
435        if self.max_segment_size >= MAX_SEGMENT_SIZE_LIMIT:
436            self.close()
437            raise self.InvalidRepositoryConfig(path, 'max_segment_size >= %d' % MAX_SEGMENT_SIZE_LIMIT)  # issue 3592
438        self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
439        self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0))
440        # append_only can be set in the constructor
441        # it shouldn't be overridden (True -> False) here
442        self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
443        if self.storage_quota is None:
444            # self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
445            self.storage_quota = parse_file_size(self.config.get('repository', 'storage_quota', fallback=0))
446        self.id = unhexlify(self.config.get('repository', 'id').strip())
447        self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
448        if self.check_segment_magic:
449            # read a segment and check whether we are dealing with a non-upgraded Attic repository
450            segment = self.io.get_latest_segment()
451            if segment is not None and self.io.get_segment_magic(segment) == ATTIC_MAGIC:
452                self.close()
453                raise self.AtticRepository(path)
454
455    def close(self):
456        if self.lock:
457            if self.io:
458                self.io.close()
459            self.io = None
460            self.lock.release()
461            self.lock = None
462
463    def commit(self, save_space=False):
464        """Commit transaction
465        """
466        # save_space is not used anymore, but stays for RPC/API compatibility.
467        if self.transaction_doomed:
468            exception = self.transaction_doomed
469            self.rollback()
470            raise exception
471        self.check_free_space()
472        self.log_storage_quota()
473        self.io.write_commit()
474        if not self.append_only:
475            self.compact_segments()
476        self.write_index()
477        self.rollback()
478
479    def _read_integrity(self, transaction_id, key):
480        integrity_file = 'integrity.%d' % transaction_id
481        integrity_path = os.path.join(self.path, integrity_file)
482        try:
483            with open(integrity_path, 'rb') as fd:
484                integrity = msgpack.unpack(fd)
485        except FileNotFoundError:
486            return
487        if integrity.get(b'version') != 2:
488            logger.warning('Unknown integrity data version %r in %s', integrity.get(b'version'), integrity_file)
489            return
490        return integrity[key].decode()
491
492    def open_index(self, transaction_id, auto_recover=True):
493        if transaction_id is None:
494            return NSIndex()
495        index_path = os.path.join(self.path, 'index.%d' % transaction_id)
496        integrity_data = self._read_integrity(transaction_id, b'index')
497        try:
498            with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
499                return NSIndex.read(fd)
500        except (ValueError, OSError, FileIntegrityError) as exc:
501            logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
502            os.unlink(index_path)
503            if not auto_recover:
504                raise
505            self.prepare_txn(self.get_transaction_id())
506            # don't leave an open transaction around
507            self.commit()
508            return self.open_index(self.get_transaction_id())
509
510    def prepare_txn(self, transaction_id, do_cleanup=True):
511        self._active_txn = True
512        if self.do_lock and not self.lock.got_exclusive_lock():
513            if self.exclusive is not None:
514                # self.exclusive is either True or False, thus a new client is active here.
515                # if it is False and we get here, the caller did not use exclusive=True although
516                # it is needed for a write operation. if it is True and we get here, something else
517                # went very wrong, because we should have a exclusive lock, but we don't.
518                raise AssertionError("bug in code, exclusive lock should exist here")
519            # if we are here, this is an old client talking to a new server (expecting lock upgrade).
520            # or we are replaying segments and might need a lock upgrade for that.
521            try:
522                self.lock.upgrade()
523            except (LockError, LockErrorT):
524                # if upgrading the lock to exclusive fails, we do not have an
525                # active transaction. this is important for "serve" mode, where
526                # the repository instance lives on - even if exceptions happened.
527                self._active_txn = False
528                raise
529        if not self.index or transaction_id is None:
530            try:
531                self.index = self.open_index(transaction_id, auto_recover=False)
532            except (ValueError, OSError, FileIntegrityError) as exc:
533                logger.warning('Checking repository transaction due to previous error: %s', exc)
534                self.check_transaction()
535                self.index = self.open_index(transaction_id, auto_recover=False)
536        if transaction_id is None:
537            self.segments = {}  # XXX bad name: usage_count_of_segment_x = self.segments[x]
538            self.compact = FreeSpace()  # XXX bad name: freeable_space_of_segment_x = self.compact[x]
539            self.storage_quota_use = 0
540            self.shadow_index.clear()
541        else:
542            if do_cleanup:
543                self.io.cleanup(transaction_id)
544            hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
545            index_path = os.path.join(self.path, 'index.%d' % transaction_id)
546            integrity_data = self._read_integrity(transaction_id, b'hints')
547            try:
548                with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
549                    hints = msgpack.unpack(fd)
550            except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
551                logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
552                if not isinstance(e, FileNotFoundError):
553                    os.unlink(hints_path)
554                # index must exist at this point
555                os.unlink(index_path)
556                self.check_transaction()
557                self.prepare_txn(transaction_id)
558                return
559            if hints[b'version'] == 1:
560                logger.debug('Upgrading from v1 hints.%d', transaction_id)
561                self.segments = hints[b'segments']
562                self.compact = FreeSpace()
563                self.storage_quota_use = 0
564                for segment in sorted(hints[b'compact']):
565                    logger.debug('Rebuilding sparse info for segment %d', segment)
566                    self._rebuild_sparse(segment)
567                logger.debug('Upgrade to v2 hints complete')
568            elif hints[b'version'] != 2:
569                raise ValueError('Unknown hints file version: %d' % hints[b'version'])
570            else:
571                self.segments = hints[b'segments']
572                self.compact = FreeSpace(hints[b'compact'])
573                self.storage_quota_use = hints.get(b'storage_quota_use', 0)
574            self.log_storage_quota()
575            # Drop uncommitted segments in the shadow index
576            for key, shadowed_segments in self.shadow_index.items():
577                for segment in list(shadowed_segments):
578                    if segment > transaction_id:
579                        shadowed_segments.remove(segment)
580
581    def write_index(self):
582        def flush_and_sync(fd):
583            fd.flush()
584            os.fsync(fd.fileno())
585
586        def rename_tmp(file):
587            os.rename(file + '.tmp', file)
588
589        hints = {
590            b'version': 2,
591            b'segments': self.segments,
592            b'compact': self.compact,
593            b'storage_quota_use': self.storage_quota_use,
594        }
595        integrity = {
596            # Integrity version started at 2, the current hints version.
597            # Thus, integrity version == hints version, for now.
598            b'version': 2,
599        }
600        transaction_id = self.io.get_segments_transaction_id()
601        assert transaction_id is not None
602
603        # Log transaction in append-only mode
604        if self.append_only:
605            with open(os.path.join(self.path, 'transactions'), 'a') as log:
606                print('transaction %d, UTC time %s' % (
607                      transaction_id, datetime.utcnow().strftime(ISO_FORMAT)), file=log)
608
609        # Write hints file
610        hints_name = 'hints.%d' % transaction_id
611        hints_file = os.path.join(self.path, hints_name)
612        with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd:
613            msgpack.pack(hints, fd)
614            flush_and_sync(fd)
615        integrity[b'hints'] = fd.integrity_data
616
617        # Write repository index
618        index_name = 'index.%d' % transaction_id
619        index_file = os.path.join(self.path, index_name)
620        with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd:
621            # XXX: Consider using SyncFile for index write-outs.
622            self.index.write(fd)
623            flush_and_sync(fd)
624        integrity[b'index'] = fd.integrity_data
625
626        # Write integrity file, containing checksums of the hints and index files
627        integrity_name = 'integrity.%d' % transaction_id
628        integrity_file = os.path.join(self.path, integrity_name)
629        with open(integrity_file + '.tmp', 'wb') as fd:
630            msgpack.pack(integrity, fd)
631            flush_and_sync(fd)
632
633        # Rename the integrity file first
634        rename_tmp(integrity_file)
635        sync_dir(self.path)
636        # Rename the others after the integrity file is hypothetically on disk
637        rename_tmp(hints_file)
638        rename_tmp(index_file)
639        sync_dir(self.path)
640
641        # Remove old auxiliary files
642        current = '.%d' % transaction_id
643        for name in os.listdir(self.path):
644            if not name.startswith(('index.', 'hints.', 'integrity.')):
645                continue
646            if name.endswith(current):
647                continue
648            os.unlink(os.path.join(self.path, name))
649        self.index = None
650
651    def check_free_space(self):
652        """Pre-commit check for sufficient free space to actually perform the commit."""
653        # As a baseline we take four times the current (on-disk) index size.
654        # At this point the index may only be updated by compaction, which won't resize it.
655        # We still apply a factor of four so that a later, separate invocation can free space
656        # (journaling all deletes for all chunks is one index size) or still make minor additions
657        # (which may grow the index up to twice its current size).
658        # Note that in a subsequent operation the committed index is still on-disk, therefore we
659        # arrive at index_size * (1 + 2 + 1).
660        # In that order: journaled deletes (1), hashtable growth (2), persisted index (1).
661        required_free_space = self.index.size() * 4
662
663        # Conservatively estimate hints file size:
664        # 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair
665        # Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes),
666        # as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size.
667        # Add 4K to generously account for constant format overhead.
668        hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096
669        required_free_space += hints_size
670
671        required_free_space += self.additional_free_space
672        if not self.append_only:
673            full_segment_size = self.max_segment_size + MAX_OBJECT_SIZE
674            if len(self.compact) < 10:
675                # This is mostly for the test suite to avoid overestimated free space needs. This can be annoying
676                # if TMP is a small-ish tmpfs.
677                compact_working_space = 0
678                for segment, free in self.compact.items():
679                    try:
680                        compact_working_space += self.io.segment_size(segment) - free
681                    except FileNotFoundError:
682                        # looks like self.compact is referring to a non-existent segment file, ignore it.
683                        pass
684                logger.debug('check_free_space: few segments, not requiring a full free segment')
685                compact_working_space = min(compact_working_space, full_segment_size)
686                logger.debug('check_free_space: calculated working space for compact as %d bytes', compact_working_space)
687                required_free_space += compact_working_space
688            else:
689                # Keep one full worst-case segment free in non-append-only mode
690                required_free_space += full_segment_size
691        try:
692            st_vfs = os.statvfs(self.path)
693        except OSError as os_error:
694            logger.warning('Failed to check free space before committing: ' + str(os_error))
695            return
696        # f_bavail: even as root - don't touch the Federal Block Reserve!
697        free_space = st_vfs.f_bavail * st_vfs.f_frsize
698        logger.debug('check_free_space: required bytes {}, free bytes {}'.format(required_free_space, free_space))
699        if free_space < required_free_space:
700            if self.created:
701                logger.error('Not enough free space to initialize repository at this location.')
702                self.destroy()
703            else:
704                self._rollback(cleanup=True)
705            formatted_required = format_file_size(required_free_space)
706            formatted_free = format_file_size(free_space)
707            raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
708
709    def log_storage_quota(self):
710        if self.storage_quota:
711            logger.info('Storage quota: %s out of %s used.',
712                        format_file_size(self.storage_quota_use), format_file_size(self.storage_quota))
713
714    def compact_segments(self):
715        """Compact sparse segments by copying data into new segments
716        """
717        if not self.compact:
718            return
719        index_transaction_id = self.get_index_transaction_id()
720        segments = self.segments
721        unused = []  # list of segments, that are not used anymore
722        logger = create_logger('borg.debug.compact_segments')
723
724        def complete_xfer(intermediate=True):
725            # complete the current transfer (when some target segment is full)
726            nonlocal unused
727            # commit the new, compact, used segments
728            segment = self.io.write_commit(intermediate=intermediate)
729            logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
730            # get rid of the old, sparse, unused segments. free space.
731            for segment in unused:
732                logger.debug('complete_xfer: deleting unused segment %d', segment)
733                count = self.segments.pop(segment)
734                assert count == 0, 'Corrupted segment reference count - corrupted index or hints'
735                self.io.delete_segment(segment)
736                del self.compact[segment]
737            unused = []
738
739        logger.debug('compaction started.')
740        pi = ProgressIndicatorPercent(total=len(self.compact), msg='Compacting segments %3.0f%%', step=1,
741                                      msgid='repository.compact_segments')
742        for segment, freeable_space in sorted(self.compact.items()):
743            if not self.io.segment_exists(segment):
744                logger.warning('segment %d not found, but listed in compaction data', segment)
745                del self.compact[segment]
746                pi.show()
747                continue
748            segment_size = self.io.segment_size(segment)
749            if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
750                logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space)
751                pi.show()
752                continue
753            segments.setdefault(segment, 0)
754            logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
755                         segment, segments[segment], freeable_space)
756            for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
757                if tag == TAG_COMMIT:
758                    continue
759                in_index = self.index.get(key)
760                is_index_object = in_index == (segment, offset)
761                if tag == TAG_PUT and is_index_object:
762                    try:
763                        new_segment, offset = self.io.write_put(key, data, raise_full=True)
764                    except LoggedIO.SegmentFull:
765                        complete_xfer()
766                        new_segment, offset = self.io.write_put(key, data)
767                    self.index[key] = new_segment, offset
768                    segments.setdefault(new_segment, 0)
769                    segments[new_segment] += 1
770                    segments[segment] -= 1
771                elif tag == TAG_PUT and not is_index_object:
772                    # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
773                    # this loop. Therefore it is removed from the shadow index.
774                    try:
775                        self.shadow_index[key].remove(segment)
776                    except (KeyError, ValueError):
777                        # do not remove entry with empty shadowed_segments list here,
778                        # it is needed for shadowed_put_exists code (see below)!
779                        pass
780                elif tag == TAG_DELETE and not in_index:
781                    # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
782                    # therefore we do not drop the delete, but write it to a current segment.
783                    shadowed_put_exists = key not in self.shadow_index or any(
784                        # If the key is in the shadow index and there is any segment with an older PUT of this
785                        # key, we have a shadowed put.
786                        shadowed < segment for shadowed in self.shadow_index[key])
787                    delete_is_not_stable = index_transaction_id is None or segment > index_transaction_id
788
789                    if shadowed_put_exists or delete_is_not_stable:
790                        # (introduced in 6425d16aa84be1eaaf88)
791                        # This is needed to avoid object un-deletion if we crash between the commit and the deletion
792                        # of old segments in complete_xfer().
793                        #
794                        # However, this only happens if the crash also affects the FS to the effect that file deletions
795                        # did not materialize consistently after journal recovery. If they always materialize in-order
796                        # then this is not a problem, because the old segment containing a deleted object would be deleted
797                        # before the segment containing the delete.
798                        #
799                        # Consider the following series of operations if we would not do this, ie. this entire if:
800                        # would be removed.
801                        # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
802                        # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
803                        #
804                        # Segment | 1     | 2   | 3
805                        # --------+-------+-----+------
806                        # Key 1   | P     | D   |
807                        # Key 2   | P     |     | P
808                        # commits |   c i |   c |   c i
809                        # --------+-------+-----+------
810                        #                       ^- compact_segments starts
811                        #                           ^- complete_xfer commits, after that complete_xfer deletes
812                        #                              segments 1 and 2 (and then the index would be written).
813                        #
814                        # Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1
815                        # is suddenly undeleted (because the delete in segment 2 is now missing).
816                        # Again, note the requirement here. We delete these in the correct order that this doesn't happen,
817                        # and only if the FS materialization of these deletes is reordered or parts dropped this can happen.
818                        # In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be
819                        # fixed by borg-check --repair.
820                        #
821                        # Note that in this check the index state is the proxy for a "most definitely settled" repository state,
822                        # ie. the assumption is that *all* operations on segments <= index state are completed and stable.
823                        try:
824                            new_segment, size = self.io.write_delete(key, raise_full=True)
825                        except LoggedIO.SegmentFull:
826                            complete_xfer()
827                            new_segment, size = self.io.write_delete(key)
828                        self.compact[new_segment] += size
829                        segments.setdefault(new_segment, 0)
830                    else:
831                        # we did not keep the delete tag for key (see if-branch)
832                        if not self.shadow_index[key]:
833                            # shadowed segments list is empty -> remove it
834                            del self.shadow_index[key]
835            assert segments[segment] == 0, 'Corrupted segment reference count - corrupted index or hints'
836            unused.append(segment)
837            pi.show()
838        pi.finish()
839        complete_xfer(intermediate=False)
840        logger.debug('compaction completed.')
841
842    def replay_segments(self, index_transaction_id, segments_transaction_id):
843        # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
844        remember_exclusive = self.exclusive
845        self.exclusive = None
846        self.prepare_txn(index_transaction_id, do_cleanup=False)
847        try:
848            segment_count = sum(1 for _ in self.io.segment_iterator())
849            pi = ProgressIndicatorPercent(total=segment_count, msg='Replaying segments %3.0f%%',
850                                          msgid='repository.replay_segments')
851            for i, (segment, filename) in enumerate(self.io.segment_iterator()):
852                pi.show(i)
853                if index_transaction_id is not None and segment <= index_transaction_id:
854                    continue
855                if segment > segments_transaction_id:
856                    break
857                objects = self.io.iter_objects(segment)
858                self._update_index(segment, objects)
859            pi.finish()
860            self.write_index()
861        finally:
862            self.exclusive = remember_exclusive
863            self.rollback()
864
865    def _update_index(self, segment, objects, report=None):
866        """some code shared between replay_segments and check"""
867        self.segments[segment] = 0
868        for tag, key, offset, size in objects:
869            if tag == TAG_PUT:
870                try:
871                    # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
872                    s, _ = self.index[key]
873                    self.compact[s] += size
874                    self.segments[s] -= 1
875                except KeyError:
876                    pass
877                self.index[key] = segment, offset
878                self.segments[segment] += 1
879                self.storage_quota_use += size
880            elif tag == TAG_DELETE:
881                try:
882                    # if the deleted PUT is not in the index, there is nothing to clean up
883                    s, offset = self.index.pop(key)
884                except KeyError:
885                    pass
886                else:
887                    if self.io.segment_exists(s):
888                        # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
889                        # is already gone, then it was already compacted.
890                        self.segments[s] -= 1
891                        size = self.io.read(s, offset, key, read_data=False)
892                        self.storage_quota_use -= size
893                        self.compact[s] += size
894            elif tag == TAG_COMMIT:
895                continue
896            else:
897                msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
898                if report is None:
899                    raise self.CheckNeeded(msg)
900                else:
901                    report(msg)
902        if self.segments[segment] == 0:
903            self.compact[segment] += self.io.segment_size(segment)
904
905    def _rebuild_sparse(self, segment):
906        """Rebuild sparse bytes count for a single segment relative to the current index."""
907        try:
908            segment_size = self.io.segment_size(segment)
909        except FileNotFoundError:
910            # segment does not exist any more, remove it from the mappings
911            # note: no need to self.compact.pop(segment), as we start from empty mapping.
912            self.segments.pop(segment)
913            return
914
915        if self.segments[segment] == 0:
916            self.compact[segment] = segment_size
917            return
918
919        self.compact[segment] = 0
920        for tag, key, offset, size in self.io.iter_objects(segment, read_data=False):
921            if tag == TAG_PUT:
922                if self.index.get(key, (-1, -1)) != (segment, offset):
923                    # This PUT is superseded later
924                    self.compact[segment] += size
925            elif tag == TAG_DELETE:
926                # The outcome of the DELETE has been recorded in the PUT branch already
927                self.compact[segment] += size
928
929    def check(self, repair=False, save_space=False):
930        """Check repository consistency
931
932        This method verifies all segment checksums and makes sure
933        the index is consistent with the data stored in the segments.
934        """
935        if self.append_only and repair:
936            raise ValueError(self.path + " is in append-only mode")
937        error_found = False
938
939        def report_error(msg):
940            nonlocal error_found
941            error_found = True
942            logger.error(msg)
943
944        logger.info('Starting repository check')
945        assert not self._active_txn
946        try:
947            transaction_id = self.get_transaction_id()
948            current_index = self.open_index(transaction_id)
949            logger.debug('Read committed index of transaction %d', transaction_id)
950        except Exception as exc:
951            transaction_id = self.io.get_segments_transaction_id()
952            current_index = None
953            logger.debug('Failed to read committed index (%s)', exc)
954        if transaction_id is None:
955            logger.debug('No segments transaction found')
956            transaction_id = self.get_index_transaction_id()
957        if transaction_id is None:
958            logger.debug('No index transaction found, trying latest segment')
959            transaction_id = self.io.get_latest_segment()
960        if transaction_id is None:
961            report_error('This repository contains no valid data.')
962            return False
963        if repair:
964            self.io.cleanup(transaction_id)
965        segments_transaction_id = self.io.get_segments_transaction_id()
966        logger.debug('Segment transaction is    %s', segments_transaction_id)
967        logger.debug('Determined transaction is %s', transaction_id)
968        self.prepare_txn(None)  # self.index, self.compact, self.segments all empty now!
969        segment_count = sum(1 for _ in self.io.segment_iterator())
970        logger.debug('Found %d segments', segment_count)
971        pi = ProgressIndicatorPercent(total=segment_count, msg='Checking segments %3.1f%%', step=0.1,
972                                      msgid='repository.check')
973        for i, (segment, filename) in enumerate(self.io.segment_iterator()):
974            pi.show(i)
975            if segment > transaction_id:
976                continue
977            logger.debug('checking segment file %s...', filename)
978            try:
979                objects = list(self.io.iter_objects(segment))
980            except IntegrityError as err:
981                report_error(str(err))
982                objects = []
983                if repair:
984                    self.io.recover_segment(segment, filename)
985                    objects = list(self.io.iter_objects(segment))
986            self._update_index(segment, objects, report_error)
987        pi.finish()
988        # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
989        # We might need to add a commit tag if no committed segment is found
990        if repair and segments_transaction_id is None:
991            report_error('Adding commit tag to segment {}'.format(transaction_id))
992            self.io.segment = transaction_id + 1
993            self.io.write_commit()
994        logger.info('Starting repository index check')
995        if current_index and not repair:
996            # current_index = "as found on disk"
997            # self.index = "as rebuilt in-memory from segments"
998            if len(current_index) != len(self.index):
999                report_error('Index object count mismatch.')
1000                logger.error('committed index: %d objects', len(current_index))
1001                logger.error('rebuilt index:   %d objects', len(self.index))
1002            else:
1003                logger.info('Index object count match.')
1004            line_format = 'ID: %-64s rebuilt index: %-16s committed index: %-16s'
1005            not_found = '<not found>'
1006            for key, value in self.index.iteritems():
1007                current_value = current_index.get(key, not_found)
1008                if current_value != value:
1009                    logger.warning(line_format, bin_to_hex(key), value, current_value)
1010            for key, current_value in current_index.iteritems():
1011                if key in self.index:
1012                    continue
1013                value = self.index.get(key, not_found)
1014                if current_value != value:
1015                    logger.warning(line_format, bin_to_hex(key), value, current_value)
1016        if repair:
1017            self.compact_segments()
1018            self.write_index()
1019        self.rollback()
1020        if error_found:
1021            if repair:
1022                logger.info('Completed repository check, errors found and repaired.')
1023            else:
1024                logger.error('Completed repository check, errors found.')
1025        else:
1026            logger.info('Completed repository check, no problems found.')
1027        return not error_found or repair
1028
1029    def scan_low_level(self):
1030        """Very low level scan over all segment file entries.
1031
1032        It does NOT care about what's committed and what not.
1033        It does NOT care whether an object might be deleted or superseded later.
1034        It just yields anything it finds in the segment files.
1035
1036        This is intended as a last-resort way to get access to all repo contents of damaged repos,
1037        when there is uncommitted, but valuable data in there...
1038        """
1039        for segment, filename in self.io.segment_iterator():
1040            try:
1041                for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
1042                    yield key, data, tag, segment, offset
1043            except IntegrityError as err:
1044                logger.error('Segment %d (%s) has IntegrityError(s) [%s] - skipping.' % (segment, filename, str(err)))
1045
1046    def _rollback(self, *, cleanup):
1047        """
1048        """
1049        if cleanup:
1050            self.io.cleanup(self.io.get_segments_transaction_id())
1051        self.index = None
1052        self._active_txn = False
1053        self.transaction_doomed = None
1054
1055    def rollback(self):
1056        # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
1057        self._rollback(cleanup=False)
1058
1059    def __len__(self):
1060        if not self.index:
1061            self.index = self.open_index(self.get_transaction_id())
1062        return len(self.index)
1063
1064    def __contains__(self, id):
1065        if not self.index:
1066            self.index = self.open_index(self.get_transaction_id())
1067        return id in self.index
1068
1069    def list(self, limit=None, marker=None):
1070        """
1071        list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
1072        """
1073        if not self.index:
1074            self.index = self.open_index(self.get_transaction_id())
1075        return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
1076
1077    def scan(self, limit=None, marker=None):
1078        """
1079        list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
1080        fetching data in this order does linear reads and reuses stuff from disk cache.
1081
1082        We rely on repository.check() has run already (either now or some time before) and that:
1083
1084        - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
1085        - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
1086        - the repository segments are valid (no CRC errors).
1087          if we encounter CRC errors in segment entry headers, rest of segment is skipped.
1088        """
1089        if limit is not None and limit < 1:
1090            raise ValueError('please use limit > 0 or limit = None')
1091        if not self.index:
1092            transaction_id = self.get_transaction_id()
1093            self.index = self.open_index(transaction_id)
1094        at_start = marker is None
1095        # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
1096        start_segment, start_offset = (0, 0) if at_start else self.index[marker]
1097        result = []
1098        for segment, filename in self.io.segment_iterator(start_segment):
1099            obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False)
1100            while True:
1101                try:
1102                    tag, id, offset, size = next(obj_iterator)
1103                except (StopIteration, IntegrityError):
1104                    # either end-of-segment or an error - we can not seek to objects at
1105                    # higher offsets than one that has an error in the header fields
1106                    break
1107                if start_offset > 0:
1108                    # we are using a marker and the marker points to the last object we have already
1109                    # returned in the previous scan() call - thus, we need to skip this one object.
1110                    # also, for the next segment, we need to start at offset 0.
1111                    start_offset = 0
1112                    continue
1113                if tag == TAG_PUT and (segment, offset) == self.index.get(id):
1114                    # we have found an existing and current object
1115                    result.append(id)
1116                    if len(result) == limit:
1117                        return result
1118        return result
1119
1120    def get(self, id):
1121        if not self.index:
1122            self.index = self.open_index(self.get_transaction_id())
1123        try:
1124            segment, offset = self.index[id]
1125            return self.io.read(segment, offset, id)
1126        except KeyError:
1127            raise self.ObjectNotFound(id, self.path) from None
1128
1129    def get_many(self, ids, is_preloaded=False):
1130        for id_ in ids:
1131            yield self.get(id_)
1132
1133    def put(self, id, data, wait=True):
1134        """put a repo object
1135
1136        Note: when doing calls with wait=False this gets async and caller must
1137              deal with async results / exceptions later.
1138        """
1139        if not self._active_txn:
1140            self.prepare_txn(self.get_transaction_id())
1141        try:
1142            segment, offset = self.index[id]
1143        except KeyError:
1144            pass
1145        else:
1146            # note: doing a delete first will do some bookkeeping.
1147            # we do not want to update the shadow_index here, because
1148            # we know already that we will PUT to this id, so it will
1149            # be in the repo index (and we won't need it in the shadow_index).
1150            self._delete(id, segment, offset, update_shadow_index=False)
1151        segment, offset = self.io.write_put(id, data)
1152        self.storage_quota_use += len(data) + self.io.put_header_fmt.size
1153        self.segments.setdefault(segment, 0)
1154        self.segments[segment] += 1
1155        self.index[id] = segment, offset
1156        if self.storage_quota and self.storage_quota_use > self.storage_quota:
1157            self.transaction_doomed = self.StorageQuotaExceeded(
1158                format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
1159            raise self.transaction_doomed
1160
1161    def delete(self, id, wait=True):
1162        """delete a repo object
1163
1164        Note: when doing calls with wait=False this gets async and caller must
1165              deal with async results / exceptions later.
1166        """
1167        if not self._active_txn:
1168            self.prepare_txn(self.get_transaction_id())
1169        try:
1170            segment, offset = self.index.pop(id)
1171        except KeyError:
1172            raise self.ObjectNotFound(id, self.path) from None
1173        # if we get here, there is an object with this id in the repo,
1174        # we write a DEL here that shadows the respective PUT.
1175        # after the delete, the object is not in the repo index any more,
1176        # for the compaction code, we need to update the shadow_index in this case.
1177        self._delete(id, segment, offset, update_shadow_index=True)
1178
1179    def _delete(self, id, segment, offset, *, update_shadow_index):
1180        # common code used by put and delete
1181        if update_shadow_index:
1182            self.shadow_index.setdefault(id, []).append(segment)
1183        self.segments[segment] -= 1
1184        size = self.io.read(segment, offset, id, read_data=False)
1185        self.storage_quota_use -= size
1186        self.compact[segment] += size
1187        segment, size = self.io.write_delete(id)
1188        self.compact[segment] += size
1189        self.segments.setdefault(segment, 0)
1190
1191    def async_response(self, wait=True):
1192        """Get one async result (only applies to remote repositories).
1193
1194        async commands (== calls with wait=False, e.g. delete and put) have no results,
1195        but may raise exceptions. These async exceptions must get collected later via
1196        async_response() calls. Repeat the call until it returns None.
1197        The previous calls might either return one (non-None) result or raise an exception.
1198        If wait=True is given and there are outstanding responses, it will wait for them
1199        to arrive. With wait=False, it will only return already received responses.
1200        """
1201
1202    def preload(self, ids):
1203        """Preload objects (only applies to remote repositories)
1204        """
1205
1206
1207class LoggedIO:
1208
1209    class SegmentFull(Exception):
1210        """raised when a segment is full, before opening next"""
1211
1212    header_fmt = struct.Struct('<IIB')
1213    assert header_fmt.size == 9
1214    put_header_fmt = struct.Struct('<IIB32s')
1215    assert put_header_fmt.size == 41
1216    header_no_crc_fmt = struct.Struct('<IB')
1217    assert header_no_crc_fmt.size == 5
1218    crc_fmt = struct.Struct('<I')
1219    assert crc_fmt.size == 4
1220
1221    _commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
1222    COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
1223
1224    def __init__(self, path, limit, segments_per_dir, capacity=90):
1225        self.path = path
1226        self.fds = LRUCache(capacity, dispose=self._close_fd)
1227        self.segment = 0
1228        self.limit = limit
1229        self.segments_per_dir = segments_per_dir
1230        self.offset = 0
1231        self._write_fd = None
1232        self._fds_cleaned = 0
1233
1234    def close(self):
1235        self.close_segment()
1236        self.fds.clear()
1237        self.fds = None  # Just to make sure we're disabled
1238
1239    def _close_fd(self, ts_fd):
1240        ts, fd = ts_fd
1241        safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
1242        fd.close()
1243
1244    def segment_iterator(self, segment=None, reverse=False):
1245        if segment is None:
1246            segment = 0 if not reverse else 2 ** 32 - 1
1247        data_path = os.path.join(self.path, 'data')
1248        start_segment_dir = segment // self.segments_per_dir
1249        dirs = os.listdir(data_path)
1250        if not reverse:
1251            dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
1252        else:
1253            dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
1254        dirs = sorted(dirs, key=int, reverse=reverse)
1255        for dir in dirs:
1256            filenames = os.listdir(os.path.join(data_path, dir))
1257            if not reverse:
1258                filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
1259            else:
1260                filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
1261            filenames = sorted(filenames, key=int, reverse=reverse)
1262            for filename in filenames:
1263                # Note: Do not filter out logically deleted segments  (see "File system interaction" above),
1264                # since this is used by cleanup and txn state detection as well.
1265                yield int(filename), os.path.join(data_path, dir, filename)
1266
1267    def get_latest_segment(self):
1268        for segment, filename in self.segment_iterator(reverse=True):
1269            return segment
1270        return None
1271
1272    def get_segments_transaction_id(self):
1273        """Return the last committed segment.
1274        """
1275        for segment, filename in self.segment_iterator(reverse=True):
1276            if self.is_committed_segment(segment):
1277                return segment
1278        return None
1279
1280    def cleanup(self, transaction_id):
1281        """Delete segment files left by aborted transactions
1282        """
1283        self.segment = transaction_id + 1
1284        count = 0
1285        for segment, filename in self.segment_iterator(reverse=True):
1286            if segment > transaction_id:
1287                if segment in self.fds:
1288                    del self.fds[segment]
1289                truncate_and_unlink(filename)
1290                count += 1
1291            else:
1292                break
1293        logger.debug('Cleaned up %d uncommitted segment files (== everything after segment %d).',
1294                     count, transaction_id)
1295
1296    def is_committed_segment(self, segment):
1297        """Check if segment ends with a COMMIT_TAG tag
1298        """
1299        try:
1300            iterator = self.iter_objects(segment)
1301        except IntegrityError:
1302            return False
1303        with open(self.segment_filename(segment), 'rb') as fd:
1304            try:
1305                fd.seek(-self.header_fmt.size, os.SEEK_END)
1306            except OSError as e:
1307                # return False if segment file is empty or too small
1308                if e.errno == errno.EINVAL:
1309                    return False
1310                raise e
1311            if fd.read(self.header_fmt.size) != self.COMMIT:
1312                return False
1313        seen_commit = False
1314        while True:
1315            try:
1316                tag, key, offset, _ = next(iterator)
1317            except IntegrityError:
1318                return False
1319            except StopIteration:
1320                break
1321            if tag == TAG_COMMIT:
1322                seen_commit = True
1323                continue
1324            if seen_commit:
1325                return False
1326        return seen_commit
1327
1328    def segment_filename(self, segment):
1329        return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
1330
1331    def get_write_fd(self, no_new=False, raise_full=False):
1332        if not no_new and self.offset and self.offset > self.limit:
1333            if raise_full:
1334                raise self.SegmentFull
1335            self.close_segment()
1336        if not self._write_fd:
1337            if self.segment % self.segments_per_dir == 0:
1338                dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
1339                if not os.path.exists(dirname):
1340                    os.mkdir(dirname)
1341                    sync_dir(os.path.join(self.path, 'data'))
1342            self._write_fd = SyncFile(self.segment_filename(self.segment), binary=True)
1343            self._write_fd.write(MAGIC)
1344            self.offset = MAGIC_LEN
1345            if self.segment in self.fds:
1346                # we may have a cached fd for a segment file we already deleted and
1347                # we are writing now a new segment file to same file name. get rid of
1348                # of the cached fd that still refers to the old file, so it will later
1349                # get repopulated (on demand) with a fd that refers to the new file.
1350                del self.fds[self.segment]
1351        return self._write_fd
1352
1353    def get_fd(self, segment):
1354        # note: get_fd() returns a fd with undefined file pointer position,
1355        # so callers must always seek() to desired position afterwards.
1356        now = time.monotonic()
1357
1358        def open_fd():
1359            fd = open(self.segment_filename(segment), 'rb')
1360            self.fds[segment] = (now, fd)
1361            return fd
1362
1363        def clean_old():
1364            # we regularly get rid of all old FDs here:
1365            if now - self._fds_cleaned > FD_MAX_AGE // 8:
1366                self._fds_cleaned = now
1367                for k, ts_fd in list(self.fds.items()):
1368                    ts, fd = ts_fd
1369                    if now - ts > FD_MAX_AGE:
1370                        # we do not want to touch long-unused file handles to
1371                        # avoid ESTALE issues (e.g. on network filesystems).
1372                        del self.fds[k]
1373
1374        clean_old()
1375        try:
1376            ts, fd = self.fds[segment]
1377        except KeyError:
1378            fd = open_fd()
1379        else:
1380            # we only have fresh enough stuff here.
1381            # update the timestamp of the lru cache entry.
1382            self.fds.upd(segment, (now, fd))
1383        return fd
1384
1385    def close_segment(self):
1386        # set self._write_fd to None early to guard against reentry from error handling code paths:
1387        fd, self._write_fd = self._write_fd, None
1388        if fd is not None:
1389            self.segment += 1
1390            self.offset = 0
1391            fd.close()
1392
1393    def delete_segment(self, segment):
1394        if segment in self.fds:
1395            del self.fds[segment]
1396        try:
1397            truncate_and_unlink(self.segment_filename(segment))
1398        except FileNotFoundError:
1399            pass
1400
1401    def segment_exists(self, segment):
1402        filename = self.segment_filename(segment)
1403        # When deleting segments, they are first truncated. If truncate(2) and unlink(2) are split
1404        # across FS transactions, then logically deleted segments will show up as truncated.
1405        return os.path.exists(filename) and os.path.getsize(filename)
1406
1407    def segment_size(self, segment):
1408        return os.path.getsize(self.segment_filename(segment))
1409
1410    def get_segment_magic(self, segment):
1411        fd = self.get_fd(segment)
1412        fd.seek(0)
1413        return fd.read(MAGIC_LEN)
1414
1415    def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
1416        """
1417        Return object iterator for *segment*.
1418
1419        If read_data is False then include_data must be False as well.
1420        Integrity checks are skipped: all data obtained from the iterator must be considered informational.
1421
1422        The iterator returns four-tuples of (tag, key, offset, data|size).
1423        """
1424        fd = self.get_fd(segment)
1425        fd.seek(offset)
1426        if offset == 0:
1427            # we are touching this segment for the first time, check the MAGIC.
1428            # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
1429            # from a marker position - but then we have checked the magic before already.
1430            if fd.read(MAGIC_LEN) != MAGIC:
1431                raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
1432            offset = MAGIC_LEN
1433        header = fd.read(self.header_fmt.size)
1434        while header:
1435            size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
1436                                              (TAG_PUT, TAG_DELETE, TAG_COMMIT),
1437                                              read_data=read_data)
1438            if include_data:
1439                yield tag, key, offset, data
1440            else:
1441                yield tag, key, offset, size
1442            offset += size
1443            # we must get the fd via get_fd() here again as we yielded to our caller and it might
1444            # have triggered closing of the fd we had before (e.g. by calling io.read() for
1445            # different segment(s)).
1446            # by calling get_fd() here again we also make our fd "recently used" so it likely
1447            # does not get kicked out of self.fds LRUcache.
1448            fd = self.get_fd(segment)
1449            fd.seek(offset)
1450            header = fd.read(self.header_fmt.size)
1451
1452    def recover_segment(self, segment, filename):
1453        logger.info('attempting to recover ' + filename)
1454        if segment in self.fds:
1455            del self.fds[segment]
1456        if os.path.getsize(filename) < MAGIC_LEN + self.header_fmt.size:
1457            # this is either a zero-byte file (which would crash mmap() below) or otherwise
1458            # just too small to be a valid non-empty segment file, so do a shortcut here:
1459            with SaveFile(filename, binary=True) as fd:
1460                fd.write(MAGIC)
1461            return
1462        with SaveFile(filename, binary=True) as dst_fd:
1463            with open(filename, 'rb') as src_fd:
1464                # note: file must not be 0 size or mmap() will crash.
1465                with mmap.mmap(src_fd.fileno(), 0, access=mmap.ACCESS_READ) as mm:
1466                    # memoryview context manager is problematic, see https://bugs.python.org/issue35686
1467                    data = memoryview(mm)
1468                    d = data
1469                    try:
1470                        dst_fd.write(MAGIC)
1471                        while len(d) >= self.header_fmt.size:
1472                            crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size])
1473                            if size < self.header_fmt.size or size > len(d):
1474                                d = d[1:]
1475                                continue
1476                            if crc32(d[4:size]) & 0xffffffff != crc:
1477                                d = d[1:]
1478                                continue
1479                            dst_fd.write(d[:size])
1480                            d = d[size:]
1481                    finally:
1482                        del d
1483                        data.release()
1484
1485    def read(self, segment, offset, id, read_data=True):
1486        """
1487        Read entry from *segment* at *offset* with *id*.
1488
1489        If read_data is False the size of the entry is returned instead and integrity checks are skipped.
1490        The return value should thus be considered informational.
1491        """
1492        if segment == self.segment and self._write_fd:
1493            self._write_fd.sync()
1494        fd = self.get_fd(segment)
1495        fd.seek(offset)
1496        header = fd.read(self.put_header_fmt.size)
1497        size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
1498        if id != key:
1499            raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
1500                segment, offset))
1501        return data if read_data else size
1502
1503    def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
1504        # some code shared by read() and iter_objects()
1505        try:
1506            hdr_tuple = fmt.unpack(header)
1507        except struct.error as err:
1508            raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
1509                segment, offset, err)) from None
1510        if fmt is self.put_header_fmt:
1511            crc, size, tag, key = hdr_tuple
1512        elif fmt is self.header_fmt:
1513            crc, size, tag = hdr_tuple
1514            key = None
1515        else:
1516            raise TypeError("_read called with unsupported format")
1517        if size > MAX_OBJECT_SIZE:
1518            # if you get this on an archive made with borg < 1.0.7 and millions of files and
1519            # you need to restore it, you can disable this check by using "if False:" above.
1520            raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
1521                size, segment, offset))
1522        if size < fmt.size:
1523            raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
1524                size, segment, offset))
1525        length = size - fmt.size
1526        if read_data:
1527            data = fd.read(length)
1528            if len(data) != length:
1529                raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
1530                    segment, offset, length, len(data)))
1531            if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
1532                raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
1533                    segment, offset))
1534            if key is None and tag in (TAG_PUT, TAG_DELETE):
1535                key, data = data[:32], data[32:]
1536        else:
1537            if key is None and tag in (TAG_PUT, TAG_DELETE):
1538                key = fd.read(32)
1539                length -= 32
1540                if len(key) != 32:
1541                    raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
1542                        segment, offset, 32, len(key)))
1543            oldpos = fd.tell()
1544            seeked = fd.seek(length, os.SEEK_CUR) - oldpos
1545            data = None
1546            if seeked != length:
1547                raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
1548                        segment, offset, length, seeked))
1549        if tag not in acceptable_tags:
1550            raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
1551                segment, offset))
1552        return size, tag, key, data
1553
1554    def write_put(self, id, data, raise_full=False):
1555        data_size = len(data)
1556        if data_size > MAX_DATA_SIZE:
1557            # this would push the segment entry size beyond MAX_OBJECT_SIZE.
1558            raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE))
1559        fd = self.get_write_fd(raise_full=raise_full)
1560        size = data_size + self.put_header_fmt.size
1561        offset = self.offset
1562        header = self.header_no_crc_fmt.pack(size, TAG_PUT)
1563        crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
1564        fd.write(b''.join((crc, header, id, data)))
1565        self.offset += size
1566        return self.segment, offset
1567
1568    def write_delete(self, id, raise_full=False):
1569        fd = self.get_write_fd(raise_full=raise_full)
1570        header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
1571        crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
1572        fd.write(b''.join((crc, header, id)))
1573        self.offset += self.put_header_fmt.size
1574        return self.segment, self.put_header_fmt.size
1575
1576    def write_commit(self, intermediate=False):
1577        if intermediate:
1578            # Intermediate commits go directly into the current segment - this makes checking their validity more
1579            # expensive, but is faster and reduces clobber.
1580            fd = self.get_write_fd()
1581            fd.sync()
1582        else:
1583            self.close_segment()
1584            fd = self.get_write_fd()
1585        header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
1586        crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
1587        fd.write(b''.join((crc, header)))
1588        self.close_segment()
1589        return self.segment - 1  # close_segment() increments it
1590
1591
1592assert LoggedIO.put_header_fmt.size == 41  # see constants.MAX_OBJECT_SIZE
1593