1import errno 2import mmap 3import os 4import shutil 5import stat 6import struct 7import time 8from binascii import hexlify, unhexlify 9from collections import defaultdict 10from configparser import ConfigParser 11from datetime import datetime 12from functools import partial 13from itertools import islice 14 15from .constants import * # NOQA 16from .hashindex import NSIndex 17from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size 18from .helpers import Location 19from .helpers import ProgressIndicatorPercent 20from .helpers import bin_to_hex 21from .helpers import hostname_is_unique 22from .helpers import secure_erase, truncate_and_unlink 23from .helpers import msgpack 24from .locking import Lock, LockError, LockErrorT 25from .logger import create_logger 26from .lrucache import LRUCache 27from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise 28from .algorithms.checksums import crc32 29from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError 30 31logger = create_logger(__name__) 32 33MAGIC = b'BORG_SEG' 34MAGIC_LEN = len(MAGIC) 35ATTIC_MAGIC = b'ATTICSEG' 36assert len(ATTIC_MAGIC) == MAGIC_LEN 37TAG_PUT = 0 38TAG_DELETE = 1 39TAG_COMMIT = 2 40 41FreeSpace = partial(defaultdict, int) 42 43 44class Repository: 45 """ 46 Filesystem based transactional key value store 47 48 Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files 49 called segments. Each segment is a series of log entries. The segment number together with the offset of each 50 entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of 51 time for the purposes of the log. 52 53 Log entries are either PUT, DELETE or COMMIT. 54 55 A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the 56 segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT 57 is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed. 58 59 When reading from a repository it is first checked whether the last segment is committed. If it is not, then 60 all segments after the last committed segment are deleted; they contain log entries whose consistency is not 61 established by a COMMIT. 62 63 Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from 64 the platform (including the hardware). See platform.base.SyncFile for details. 65 66 A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements 67 full data logging, meaning that all data is consistent, not just metadata (which is common in file systems). 68 69 A DELETE marks a key as deleted. 70 71 For a given key only the last entry regarding the key, which is called current (all other entries are called 72 superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist. 73 Otherwise the last PUT defines the value of the key. 74 75 By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing 76 such obsolete entries is called sparse, while a segment containing no such entries is called compact. 77 78 Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the 79 superseded entries where current. 80 81 On disk layout: 82 83 dir/README 84 dir/config 85 dir/data/<X // SEGMENTS_PER_DIR>/<X> 86 dir/index.X 87 dir/hints.X 88 89 File system interaction 90 ----------------------- 91 92 LoggedIO generally tries to rely on common behaviours across transactional file systems. 93 94 Segments that are deleted are truncated first, which avoids problems if the FS needs to 95 allocate space to delete the dirent of the segment. This mostly affects CoW file systems, 96 traditional journaling file systems have a fairly good grip on this problem. 97 98 Note that deletion, i.e. unlink(2), is atomic on every file system that uses inode reference 99 counts, which includes pretty much all of them. To remove a dirent the inodes refcount has 100 to be decreased, but you can't decrease the refcount before removing the dirent nor can you 101 decrease the refcount after removing the dirent. File systems solve this with a lock, 102 and by ensuring it all stays within the same FS transaction. 103 104 Truncation is generally not atomic in itself, and combining truncate(2) and unlink(2) is of 105 course never guaranteed to be atomic. Truncation in a classic extent-based FS is done in 106 roughly two phases, first the extents are removed then the inode is updated. (In practice 107 this is of course way more complex). 108 109 LoggedIO gracefully handles truncate/unlink splits as long as the truncate resulted in 110 a zero length file. Zero length segments are considered to not exist, while LoggedIO.cleanup() 111 will still get rid of them. 112 """ 113 114 class DoesNotExist(Error): 115 """Repository {} does not exist.""" 116 117 class AlreadyExists(Error): 118 """A repository already exists at {}.""" 119 120 class PathAlreadyExists(Error): 121 """There is already something at {}.""" 122 123 class ParentPathDoesNotExist(Error): 124 """The parent path of the repo directory [{}] does not exist.""" 125 126 class InvalidRepository(Error): 127 """{} is not a valid repository. Check repo config.""" 128 129 class InvalidRepositoryConfig(Error): 130 """{} does not have a valid configuration. Check repo config [{}].""" 131 132 class AtticRepository(Error): 133 """Attic repository detected. Please run "borg upgrade {}".""" 134 135 class CheckNeeded(ErrorWithTraceback): 136 """Inconsistency detected. Please run "borg check {}".""" 137 138 class ObjectNotFound(ErrorWithTraceback): 139 """Object with key {} not found in repository {}.""" 140 141 def __init__(self, id, repo): 142 if isinstance(id, bytes): 143 id = bin_to_hex(id) 144 super().__init__(id, repo) 145 146 class InsufficientFreeSpaceError(Error): 147 """Insufficient free space to complete transaction (required: {}, available: {}).""" 148 149 class StorageQuotaExceeded(Error): 150 """The storage quota ({}) has been exceeded ({}). Try deleting some archives.""" 151 152 def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True, 153 append_only=False, storage_quota=None, check_segment_magic=True, 154 make_parent_dirs=False): 155 self.path = os.path.abspath(path) 156 self._location = Location('file://%s' % self.path) 157 self.io = None # type: LoggedIO 158 self.lock = None 159 self.index = None 160 # This is an index of shadowed log entries during this transaction. Consider the following sequence: 161 # segment_n PUT A, segment_x DELETE A 162 # After the "DELETE A" in segment_x the shadow index will contain "A -> [n]". 163 self.shadow_index = {} 164 self._active_txn = False 165 self.lock_wait = lock_wait 166 self.do_lock = lock 167 self.do_create = create 168 self.created = False 169 self.exclusive = exclusive 170 self.append_only = append_only 171 self.storage_quota = storage_quota 172 self.storage_quota_use = 0 173 self.transaction_doomed = None 174 self.check_segment_magic = check_segment_magic 175 self.make_parent_dirs = make_parent_dirs 176 177 def __del__(self): 178 if self.lock: 179 self.close() 180 assert False, "cleanup happened in Repository.__del__" 181 182 def __repr__(self): 183 return '<%s %s>' % (self.__class__.__name__, self.path) 184 185 def __enter__(self): 186 if self.do_create: 187 self.do_create = False 188 self.create(self.path) 189 self.created = True 190 self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock) 191 return self 192 193 def __exit__(self, exc_type, exc_val, exc_tb): 194 if exc_type is not None: 195 no_space_left_on_device = exc_type is OSError and exc_val.errno == errno.ENOSPC 196 # The ENOSPC could have originated somewhere else besides the Repository. The cleanup is always safe, unless 197 # EIO or FS corruption ensues, which is why we specifically check for ENOSPC. 198 if self._active_txn and no_space_left_on_device: 199 logger.warning('No space left on device, cleaning up partial transaction to free space.') 200 cleanup = True 201 else: 202 cleanup = False 203 self._rollback(cleanup=cleanup) 204 self.close() 205 206 @property 207 def id_str(self): 208 return bin_to_hex(self.id) 209 210 @staticmethod 211 def is_repository(path): 212 """Check whether there is already a Borg repository at *path*.""" 213 try: 214 # Use binary mode to avoid troubles if a README contains some stuff not in our locale 215 with open(os.path.join(path, 'README'), 'rb') as fd: 216 # Read only the first ~100 bytes (if any), in case some README file we stumble upon is large. 217 readme_head = fd.read(100) 218 # The first comparison captures our current variant (REPOSITORY_README), the second comparison 219 # is an older variant of the README file (used by 1.0.x). 220 return b'Borg Backup repository' in readme_head or b'Borg repository' in readme_head 221 except OSError: 222 # Ignore FileNotFound, PermissionError, ... 223 return False 224 225 def check_can_create_repository(self, path): 226 """ 227 Raise an exception if a repository already exists at *path* or any parent directory. 228 229 Checking parent directories is done for two reasons: 230 (1) It's just a weird thing to do, and usually not intended. A Borg using the "parent" repository 231 may be confused, or we may accidentally put stuff into the "data/" or "data/<n>/" directories. 232 (2) When implementing repository quotas (which we currently don't), it's important to prohibit 233 folks from creating quota-free repositories. Since no one can create a repository within another 234 repository, user's can only use the quota'd repository, when their --restrict-to-path points 235 at the user's repository. 236 """ 237 try: 238 st = os.stat(path) 239 except FileNotFoundError: 240 pass # nothing there! 241 else: 242 # there is something already there! 243 if self.is_repository(path): 244 raise self.AlreadyExists(path) 245 if not stat.S_ISDIR(st.st_mode) or os.listdir(path): 246 raise self.PathAlreadyExists(path) 247 # an empty directory is acceptable for us. 248 249 while True: 250 # Check all parent directories for Borg's repository README 251 previous_path = path 252 # Thus, path = previous_path/.. 253 path = os.path.abspath(os.path.join(previous_path, os.pardir)) 254 if path == previous_path: 255 # We reached the root of the directory hierarchy (/.. = / and C:\.. = C:\). 256 break 257 if self.is_repository(path): 258 raise self.AlreadyExists(path) 259 260 def create(self, path): 261 """Create a new empty repository at `path` 262 """ 263 self.check_can_create_repository(path) 264 if self.make_parent_dirs: 265 parent_path = os.path.join(path, os.pardir) 266 os.makedirs(parent_path, exist_ok=True) 267 if not os.path.exists(path): 268 try: 269 os.mkdir(path) 270 except FileNotFoundError as err: 271 raise self.ParentPathDoesNotExist(path) from err 272 with open(os.path.join(path, 'README'), 'w') as fd: 273 fd.write(REPOSITORY_README) 274 os.mkdir(os.path.join(path, 'data')) 275 config = ConfigParser(interpolation=None) 276 config.add_section('repository') 277 config.set('repository', 'version', '1') 278 config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR)) 279 config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE)) 280 config.set('repository', 'append_only', str(int(self.append_only))) 281 if self.storage_quota: 282 config.set('repository', 'storage_quota', str(self.storage_quota)) 283 else: 284 config.set('repository', 'storage_quota', '0') 285 config.set('repository', 'additional_free_space', '0') 286 config.set('repository', 'id', bin_to_hex(os.urandom(32))) 287 self.save_config(path, config) 288 289 def save_config(self, path, config): 290 config_path = os.path.join(path, 'config') 291 old_config_path = os.path.join(path, 'config.old') 292 293 if os.path.isfile(old_config_path): 294 logger.warning("Old config file not securely erased on previous config update") 295 secure_erase(old_config_path) 296 297 if os.path.isfile(config_path): 298 link_error_msg = ("Failed to securely erase old repository config file (hardlinks not supported>). " 299 "Old repokey data, if any, might persist on physical storage.") 300 try: 301 os.link(config_path, old_config_path) 302 except OSError as e: 303 if e.errno in (errno.EMLINK, errno.ENOSYS, errno.EPERM, errno.EACCES, errno.ENOTSUP, errno.EIO): 304 logger.warning(link_error_msg) 305 else: 306 raise 307 except AttributeError: 308 # some python ports have no os.link, see #4901 309 logger.warning(link_error_msg) 310 311 try: 312 with SaveFile(config_path) as fd: 313 config.write(fd) 314 except PermissionError as e: 315 # error is only a problem if we even had a lock 316 if self.do_lock: 317 raise 318 logger.warning("%s: Failed writing to '%s'. This is expected when working on " 319 "read-only repositories." % (e.strerror, e.filename)) 320 321 if os.path.isfile(old_config_path): 322 secure_erase(old_config_path) 323 324 def save_key(self, keydata): 325 assert self.config 326 keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes 327 self.config.set('repository', 'key', keydata) 328 self.save_config(self.path, self.config) 329 330 def load_key(self): 331 keydata = self.config.get('repository', 'key') 332 return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes 333 334 def get_free_nonce(self): 335 if self.do_lock and not self.lock.got_exclusive_lock(): 336 raise AssertionError("bug in code, exclusive lock should exist here") 337 338 nonce_path = os.path.join(self.path, 'nonce') 339 try: 340 with open(nonce_path, 'r') as fd: 341 return int.from_bytes(unhexlify(fd.read()), byteorder='big') 342 except FileNotFoundError: 343 return None 344 345 def commit_nonce_reservation(self, next_unreserved, start_nonce): 346 if self.do_lock and not self.lock.got_exclusive_lock(): 347 raise AssertionError("bug in code, exclusive lock should exist here") 348 349 if self.get_free_nonce() != start_nonce: 350 raise Exception("nonce space reservation with mismatched previous state") 351 nonce_path = os.path.join(self.path, 'nonce') 352 try: 353 with SaveFile(nonce_path, binary=False) as fd: 354 fd.write(bin_to_hex(next_unreserved.to_bytes(8, byteorder='big'))) 355 except PermissionError as e: 356 # error is only a problem if we even had a lock 357 if self.do_lock: 358 raise 359 logger.warning("%s: Failed writing to '%s'. This is expected when working on " 360 "read-only repositories." % (e.strerror, e.filename)) 361 362 def destroy(self): 363 """Destroy the repository at `self.path` 364 """ 365 if self.append_only: 366 raise ValueError(self.path + " is in append-only mode") 367 self.close() 368 os.remove(os.path.join(self.path, 'config')) # kill config first 369 shutil.rmtree(self.path) 370 371 def get_index_transaction_id(self): 372 indices = sorted(int(fn[6:]) 373 for fn in os.listdir(self.path) 374 if fn.startswith('index.') and fn[6:].isdigit() and os.stat(os.path.join(self.path, fn)).st_size != 0) 375 if indices: 376 return indices[-1] 377 else: 378 return None 379 380 def check_transaction(self): 381 index_transaction_id = self.get_index_transaction_id() 382 segments_transaction_id = self.io.get_segments_transaction_id() 383 if index_transaction_id is not None and segments_transaction_id is None: 384 # we have a transaction id from the index, but we did not find *any* 385 # commit in the segment files (thus no segments transaction id). 386 # this can happen if a lot of segment files are lost, e.g. due to a 387 # filesystem or hardware malfunction. it means we have no identifiable 388 # valid (committed) state of the repo which we could use. 389 msg = '%s" - although likely this is "beyond repair' % self.path # dirty hack 390 raise self.CheckNeeded(msg) 391 # Attempt to automatically rebuild index if we crashed between commit 392 # tag write and index save 393 if index_transaction_id != segments_transaction_id: 394 if index_transaction_id is not None and index_transaction_id > segments_transaction_id: 395 replay_from = None 396 else: 397 replay_from = index_transaction_id 398 self.replay_segments(replay_from, segments_transaction_id) 399 400 def get_transaction_id(self): 401 self.check_transaction() 402 return self.get_index_transaction_id() 403 404 def break_lock(self): 405 Lock(os.path.join(self.path, 'lock')).break_lock() 406 407 def migrate_lock(self, old_id, new_id): 408 # note: only needed for local repos 409 if self.lock is not None: 410 self.lock.migrate_lock(old_id, new_id) 411 412 def open(self, path, exclusive, lock_wait=None, lock=True): 413 self.path = path 414 try: 415 st = os.stat(path) 416 except FileNotFoundError: 417 raise self.DoesNotExist(path) 418 if not stat.S_ISDIR(st.st_mode): 419 raise self.InvalidRepository(path) 420 if lock: 421 self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=hostname_is_unique()).acquire() 422 else: 423 self.lock = None 424 self.config = ConfigParser(interpolation=None) 425 try: 426 with open(os.path.join(self.path, 'config')) as fd: 427 self.config.read_file(fd) 428 except FileNotFoundError: 429 self.close() 430 raise self.InvalidRepository(self.path) 431 if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1: 432 self.close() 433 raise self.InvalidRepository(path) 434 self.max_segment_size = parse_file_size(self.config.get('repository', 'max_segment_size')) 435 if self.max_segment_size >= MAX_SEGMENT_SIZE_LIMIT: 436 self.close() 437 raise self.InvalidRepositoryConfig(path, 'max_segment_size >= %d' % MAX_SEGMENT_SIZE_LIMIT) # issue 3592 438 self.segments_per_dir = self.config.getint('repository', 'segments_per_dir') 439 self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0)) 440 # append_only can be set in the constructor 441 # it shouldn't be overridden (True -> False) here 442 self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False) 443 if self.storage_quota is None: 444 # self.storage_quota is None => no explicit storage_quota was specified, use repository setting. 445 self.storage_quota = parse_file_size(self.config.get('repository', 'storage_quota', fallback=0)) 446 self.id = unhexlify(self.config.get('repository', 'id').strip()) 447 self.io = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir) 448 if self.check_segment_magic: 449 # read a segment and check whether we are dealing with a non-upgraded Attic repository 450 segment = self.io.get_latest_segment() 451 if segment is not None and self.io.get_segment_magic(segment) == ATTIC_MAGIC: 452 self.close() 453 raise self.AtticRepository(path) 454 455 def close(self): 456 if self.lock: 457 if self.io: 458 self.io.close() 459 self.io = None 460 self.lock.release() 461 self.lock = None 462 463 def commit(self, save_space=False): 464 """Commit transaction 465 """ 466 # save_space is not used anymore, but stays for RPC/API compatibility. 467 if self.transaction_doomed: 468 exception = self.transaction_doomed 469 self.rollback() 470 raise exception 471 self.check_free_space() 472 self.log_storage_quota() 473 self.io.write_commit() 474 if not self.append_only: 475 self.compact_segments() 476 self.write_index() 477 self.rollback() 478 479 def _read_integrity(self, transaction_id, key): 480 integrity_file = 'integrity.%d' % transaction_id 481 integrity_path = os.path.join(self.path, integrity_file) 482 try: 483 with open(integrity_path, 'rb') as fd: 484 integrity = msgpack.unpack(fd) 485 except FileNotFoundError: 486 return 487 if integrity.get(b'version') != 2: 488 logger.warning('Unknown integrity data version %r in %s', integrity.get(b'version'), integrity_file) 489 return 490 return integrity[key].decode() 491 492 def open_index(self, transaction_id, auto_recover=True): 493 if transaction_id is None: 494 return NSIndex() 495 index_path = os.path.join(self.path, 'index.%d' % transaction_id) 496 integrity_data = self._read_integrity(transaction_id, b'index') 497 try: 498 with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd: 499 return NSIndex.read(fd) 500 except (ValueError, OSError, FileIntegrityError) as exc: 501 logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc) 502 os.unlink(index_path) 503 if not auto_recover: 504 raise 505 self.prepare_txn(self.get_transaction_id()) 506 # don't leave an open transaction around 507 self.commit() 508 return self.open_index(self.get_transaction_id()) 509 510 def prepare_txn(self, transaction_id, do_cleanup=True): 511 self._active_txn = True 512 if self.do_lock and not self.lock.got_exclusive_lock(): 513 if self.exclusive is not None: 514 # self.exclusive is either True or False, thus a new client is active here. 515 # if it is False and we get here, the caller did not use exclusive=True although 516 # it is needed for a write operation. if it is True and we get here, something else 517 # went very wrong, because we should have a exclusive lock, but we don't. 518 raise AssertionError("bug in code, exclusive lock should exist here") 519 # if we are here, this is an old client talking to a new server (expecting lock upgrade). 520 # or we are replaying segments and might need a lock upgrade for that. 521 try: 522 self.lock.upgrade() 523 except (LockError, LockErrorT): 524 # if upgrading the lock to exclusive fails, we do not have an 525 # active transaction. this is important for "serve" mode, where 526 # the repository instance lives on - even if exceptions happened. 527 self._active_txn = False 528 raise 529 if not self.index or transaction_id is None: 530 try: 531 self.index = self.open_index(transaction_id, auto_recover=False) 532 except (ValueError, OSError, FileIntegrityError) as exc: 533 logger.warning('Checking repository transaction due to previous error: %s', exc) 534 self.check_transaction() 535 self.index = self.open_index(transaction_id, auto_recover=False) 536 if transaction_id is None: 537 self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x] 538 self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x] 539 self.storage_quota_use = 0 540 self.shadow_index.clear() 541 else: 542 if do_cleanup: 543 self.io.cleanup(transaction_id) 544 hints_path = os.path.join(self.path, 'hints.%d' % transaction_id) 545 index_path = os.path.join(self.path, 'index.%d' % transaction_id) 546 integrity_data = self._read_integrity(transaction_id, b'hints') 547 try: 548 with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd: 549 hints = msgpack.unpack(fd) 550 except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e: 551 logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e) 552 if not isinstance(e, FileNotFoundError): 553 os.unlink(hints_path) 554 # index must exist at this point 555 os.unlink(index_path) 556 self.check_transaction() 557 self.prepare_txn(transaction_id) 558 return 559 if hints[b'version'] == 1: 560 logger.debug('Upgrading from v1 hints.%d', transaction_id) 561 self.segments = hints[b'segments'] 562 self.compact = FreeSpace() 563 self.storage_quota_use = 0 564 for segment in sorted(hints[b'compact']): 565 logger.debug('Rebuilding sparse info for segment %d', segment) 566 self._rebuild_sparse(segment) 567 logger.debug('Upgrade to v2 hints complete') 568 elif hints[b'version'] != 2: 569 raise ValueError('Unknown hints file version: %d' % hints[b'version']) 570 else: 571 self.segments = hints[b'segments'] 572 self.compact = FreeSpace(hints[b'compact']) 573 self.storage_quota_use = hints.get(b'storage_quota_use', 0) 574 self.log_storage_quota() 575 # Drop uncommitted segments in the shadow index 576 for key, shadowed_segments in self.shadow_index.items(): 577 for segment in list(shadowed_segments): 578 if segment > transaction_id: 579 shadowed_segments.remove(segment) 580 581 def write_index(self): 582 def flush_and_sync(fd): 583 fd.flush() 584 os.fsync(fd.fileno()) 585 586 def rename_tmp(file): 587 os.rename(file + '.tmp', file) 588 589 hints = { 590 b'version': 2, 591 b'segments': self.segments, 592 b'compact': self.compact, 593 b'storage_quota_use': self.storage_quota_use, 594 } 595 integrity = { 596 # Integrity version started at 2, the current hints version. 597 # Thus, integrity version == hints version, for now. 598 b'version': 2, 599 } 600 transaction_id = self.io.get_segments_transaction_id() 601 assert transaction_id is not None 602 603 # Log transaction in append-only mode 604 if self.append_only: 605 with open(os.path.join(self.path, 'transactions'), 'a') as log: 606 print('transaction %d, UTC time %s' % ( 607 transaction_id, datetime.utcnow().strftime(ISO_FORMAT)), file=log) 608 609 # Write hints file 610 hints_name = 'hints.%d' % transaction_id 611 hints_file = os.path.join(self.path, hints_name) 612 with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd: 613 msgpack.pack(hints, fd) 614 flush_and_sync(fd) 615 integrity[b'hints'] = fd.integrity_data 616 617 # Write repository index 618 index_name = 'index.%d' % transaction_id 619 index_file = os.path.join(self.path, index_name) 620 with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd: 621 # XXX: Consider using SyncFile for index write-outs. 622 self.index.write(fd) 623 flush_and_sync(fd) 624 integrity[b'index'] = fd.integrity_data 625 626 # Write integrity file, containing checksums of the hints and index files 627 integrity_name = 'integrity.%d' % transaction_id 628 integrity_file = os.path.join(self.path, integrity_name) 629 with open(integrity_file + '.tmp', 'wb') as fd: 630 msgpack.pack(integrity, fd) 631 flush_and_sync(fd) 632 633 # Rename the integrity file first 634 rename_tmp(integrity_file) 635 sync_dir(self.path) 636 # Rename the others after the integrity file is hypothetically on disk 637 rename_tmp(hints_file) 638 rename_tmp(index_file) 639 sync_dir(self.path) 640 641 # Remove old auxiliary files 642 current = '.%d' % transaction_id 643 for name in os.listdir(self.path): 644 if not name.startswith(('index.', 'hints.', 'integrity.')): 645 continue 646 if name.endswith(current): 647 continue 648 os.unlink(os.path.join(self.path, name)) 649 self.index = None 650 651 def check_free_space(self): 652 """Pre-commit check for sufficient free space to actually perform the commit.""" 653 # As a baseline we take four times the current (on-disk) index size. 654 # At this point the index may only be updated by compaction, which won't resize it. 655 # We still apply a factor of four so that a later, separate invocation can free space 656 # (journaling all deletes for all chunks is one index size) or still make minor additions 657 # (which may grow the index up to twice its current size). 658 # Note that in a subsequent operation the committed index is still on-disk, therefore we 659 # arrive at index_size * (1 + 2 + 1). 660 # In that order: journaled deletes (1), hashtable growth (2), persisted index (1). 661 required_free_space = self.index.size() * 4 662 663 # Conservatively estimate hints file size: 664 # 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair 665 # Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes), 666 # as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size. 667 # Add 4K to generously account for constant format overhead. 668 hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096 669 required_free_space += hints_size 670 671 required_free_space += self.additional_free_space 672 if not self.append_only: 673 full_segment_size = self.max_segment_size + MAX_OBJECT_SIZE 674 if len(self.compact) < 10: 675 # This is mostly for the test suite to avoid overestimated free space needs. This can be annoying 676 # if TMP is a small-ish tmpfs. 677 compact_working_space = 0 678 for segment, free in self.compact.items(): 679 try: 680 compact_working_space += self.io.segment_size(segment) - free 681 except FileNotFoundError: 682 # looks like self.compact is referring to a non-existent segment file, ignore it. 683 pass 684 logger.debug('check_free_space: few segments, not requiring a full free segment') 685 compact_working_space = min(compact_working_space, full_segment_size) 686 logger.debug('check_free_space: calculated working space for compact as %d bytes', compact_working_space) 687 required_free_space += compact_working_space 688 else: 689 # Keep one full worst-case segment free in non-append-only mode 690 required_free_space += full_segment_size 691 try: 692 st_vfs = os.statvfs(self.path) 693 except OSError as os_error: 694 logger.warning('Failed to check free space before committing: ' + str(os_error)) 695 return 696 # f_bavail: even as root - don't touch the Federal Block Reserve! 697 free_space = st_vfs.f_bavail * st_vfs.f_frsize 698 logger.debug('check_free_space: required bytes {}, free bytes {}'.format(required_free_space, free_space)) 699 if free_space < required_free_space: 700 if self.created: 701 logger.error('Not enough free space to initialize repository at this location.') 702 self.destroy() 703 else: 704 self._rollback(cleanup=True) 705 formatted_required = format_file_size(required_free_space) 706 formatted_free = format_file_size(free_space) 707 raise self.InsufficientFreeSpaceError(formatted_required, formatted_free) 708 709 def log_storage_quota(self): 710 if self.storage_quota: 711 logger.info('Storage quota: %s out of %s used.', 712 format_file_size(self.storage_quota_use), format_file_size(self.storage_quota)) 713 714 def compact_segments(self): 715 """Compact sparse segments by copying data into new segments 716 """ 717 if not self.compact: 718 return 719 index_transaction_id = self.get_index_transaction_id() 720 segments = self.segments 721 unused = [] # list of segments, that are not used anymore 722 logger = create_logger('borg.debug.compact_segments') 723 724 def complete_xfer(intermediate=True): 725 # complete the current transfer (when some target segment is full) 726 nonlocal unused 727 # commit the new, compact, used segments 728 segment = self.io.write_commit(intermediate=intermediate) 729 logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment) 730 # get rid of the old, sparse, unused segments. free space. 731 for segment in unused: 732 logger.debug('complete_xfer: deleting unused segment %d', segment) 733 count = self.segments.pop(segment) 734 assert count == 0, 'Corrupted segment reference count - corrupted index or hints' 735 self.io.delete_segment(segment) 736 del self.compact[segment] 737 unused = [] 738 739 logger.debug('compaction started.') 740 pi = ProgressIndicatorPercent(total=len(self.compact), msg='Compacting segments %3.0f%%', step=1, 741 msgid='repository.compact_segments') 742 for segment, freeable_space in sorted(self.compact.items()): 743 if not self.io.segment_exists(segment): 744 logger.warning('segment %d not found, but listed in compaction data', segment) 745 del self.compact[segment] 746 pi.show() 747 continue 748 segment_size = self.io.segment_size(segment) 749 if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size: 750 logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space) 751 pi.show() 752 continue 753 segments.setdefault(segment, 0) 754 logger.debug('compacting segment %d with usage count %d and %d freeable bytes', 755 segment, segments[segment], freeable_space) 756 for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): 757 if tag == TAG_COMMIT: 758 continue 759 in_index = self.index.get(key) 760 is_index_object = in_index == (segment, offset) 761 if tag == TAG_PUT and is_index_object: 762 try: 763 new_segment, offset = self.io.write_put(key, data, raise_full=True) 764 except LoggedIO.SegmentFull: 765 complete_xfer() 766 new_segment, offset = self.io.write_put(key, data) 767 self.index[key] = new_segment, offset 768 segments.setdefault(new_segment, 0) 769 segments[new_segment] += 1 770 segments[segment] -= 1 771 elif tag == TAG_PUT and not is_index_object: 772 # If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after 773 # this loop. Therefore it is removed from the shadow index. 774 try: 775 self.shadow_index[key].remove(segment) 776 except (KeyError, ValueError): 777 # do not remove entry with empty shadowed_segments list here, 778 # it is needed for shadowed_put_exists code (see below)! 779 pass 780 elif tag == TAG_DELETE and not in_index: 781 # If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag, 782 # therefore we do not drop the delete, but write it to a current segment. 783 shadowed_put_exists = key not in self.shadow_index or any( 784 # If the key is in the shadow index and there is any segment with an older PUT of this 785 # key, we have a shadowed put. 786 shadowed < segment for shadowed in self.shadow_index[key]) 787 delete_is_not_stable = index_transaction_id is None or segment > index_transaction_id 788 789 if shadowed_put_exists or delete_is_not_stable: 790 # (introduced in 6425d16aa84be1eaaf88) 791 # This is needed to avoid object un-deletion if we crash between the commit and the deletion 792 # of old segments in complete_xfer(). 793 # 794 # However, this only happens if the crash also affects the FS to the effect that file deletions 795 # did not materialize consistently after journal recovery. If they always materialize in-order 796 # then this is not a problem, because the old segment containing a deleted object would be deleted 797 # before the segment containing the delete. 798 # 799 # Consider the following series of operations if we would not do this, ie. this entire if: 800 # would be removed. 801 # Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key) 802 # Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit 803 # 804 # Segment | 1 | 2 | 3 805 # --------+-------+-----+------ 806 # Key 1 | P | D | 807 # Key 2 | P | | P 808 # commits | c i | c | c i 809 # --------+-------+-----+------ 810 # ^- compact_segments starts 811 # ^- complete_xfer commits, after that complete_xfer deletes 812 # segments 1 and 2 (and then the index would be written). 813 # 814 # Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1 815 # is suddenly undeleted (because the delete in segment 2 is now missing). 816 # Again, note the requirement here. We delete these in the correct order that this doesn't happen, 817 # and only if the FS materialization of these deletes is reordered or parts dropped this can happen. 818 # In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be 819 # fixed by borg-check --repair. 820 # 821 # Note that in this check the index state is the proxy for a "most definitely settled" repository state, 822 # ie. the assumption is that *all* operations on segments <= index state are completed and stable. 823 try: 824 new_segment, size = self.io.write_delete(key, raise_full=True) 825 except LoggedIO.SegmentFull: 826 complete_xfer() 827 new_segment, size = self.io.write_delete(key) 828 self.compact[new_segment] += size 829 segments.setdefault(new_segment, 0) 830 else: 831 # we did not keep the delete tag for key (see if-branch) 832 if not self.shadow_index[key]: 833 # shadowed segments list is empty -> remove it 834 del self.shadow_index[key] 835 assert segments[segment] == 0, 'Corrupted segment reference count - corrupted index or hints' 836 unused.append(segment) 837 pi.show() 838 pi.finish() 839 complete_xfer(intermediate=False) 840 logger.debug('compaction completed.') 841 842 def replay_segments(self, index_transaction_id, segments_transaction_id): 843 # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock: 844 remember_exclusive = self.exclusive 845 self.exclusive = None 846 self.prepare_txn(index_transaction_id, do_cleanup=False) 847 try: 848 segment_count = sum(1 for _ in self.io.segment_iterator()) 849 pi = ProgressIndicatorPercent(total=segment_count, msg='Replaying segments %3.0f%%', 850 msgid='repository.replay_segments') 851 for i, (segment, filename) in enumerate(self.io.segment_iterator()): 852 pi.show(i) 853 if index_transaction_id is not None and segment <= index_transaction_id: 854 continue 855 if segment > segments_transaction_id: 856 break 857 objects = self.io.iter_objects(segment) 858 self._update_index(segment, objects) 859 pi.finish() 860 self.write_index() 861 finally: 862 self.exclusive = remember_exclusive 863 self.rollback() 864 865 def _update_index(self, segment, objects, report=None): 866 """some code shared between replay_segments and check""" 867 self.segments[segment] = 0 868 for tag, key, offset, size in objects: 869 if tag == TAG_PUT: 870 try: 871 # If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space 872 s, _ = self.index[key] 873 self.compact[s] += size 874 self.segments[s] -= 1 875 except KeyError: 876 pass 877 self.index[key] = segment, offset 878 self.segments[segment] += 1 879 self.storage_quota_use += size 880 elif tag == TAG_DELETE: 881 try: 882 # if the deleted PUT is not in the index, there is nothing to clean up 883 s, offset = self.index.pop(key) 884 except KeyError: 885 pass 886 else: 887 if self.io.segment_exists(s): 888 # the old index is not necessarily valid for this transaction (e.g. compaction); if the segment 889 # is already gone, then it was already compacted. 890 self.segments[s] -= 1 891 size = self.io.read(s, offset, key, read_data=False) 892 self.storage_quota_use -= size 893 self.compact[s] += size 894 elif tag == TAG_COMMIT: 895 continue 896 else: 897 msg = 'Unexpected tag {} in segment {}'.format(tag, segment) 898 if report is None: 899 raise self.CheckNeeded(msg) 900 else: 901 report(msg) 902 if self.segments[segment] == 0: 903 self.compact[segment] += self.io.segment_size(segment) 904 905 def _rebuild_sparse(self, segment): 906 """Rebuild sparse bytes count for a single segment relative to the current index.""" 907 try: 908 segment_size = self.io.segment_size(segment) 909 except FileNotFoundError: 910 # segment does not exist any more, remove it from the mappings 911 # note: no need to self.compact.pop(segment), as we start from empty mapping. 912 self.segments.pop(segment) 913 return 914 915 if self.segments[segment] == 0: 916 self.compact[segment] = segment_size 917 return 918 919 self.compact[segment] = 0 920 for tag, key, offset, size in self.io.iter_objects(segment, read_data=False): 921 if tag == TAG_PUT: 922 if self.index.get(key, (-1, -1)) != (segment, offset): 923 # This PUT is superseded later 924 self.compact[segment] += size 925 elif tag == TAG_DELETE: 926 # The outcome of the DELETE has been recorded in the PUT branch already 927 self.compact[segment] += size 928 929 def check(self, repair=False, save_space=False): 930 """Check repository consistency 931 932 This method verifies all segment checksums and makes sure 933 the index is consistent with the data stored in the segments. 934 """ 935 if self.append_only and repair: 936 raise ValueError(self.path + " is in append-only mode") 937 error_found = False 938 939 def report_error(msg): 940 nonlocal error_found 941 error_found = True 942 logger.error(msg) 943 944 logger.info('Starting repository check') 945 assert not self._active_txn 946 try: 947 transaction_id = self.get_transaction_id() 948 current_index = self.open_index(transaction_id) 949 logger.debug('Read committed index of transaction %d', transaction_id) 950 except Exception as exc: 951 transaction_id = self.io.get_segments_transaction_id() 952 current_index = None 953 logger.debug('Failed to read committed index (%s)', exc) 954 if transaction_id is None: 955 logger.debug('No segments transaction found') 956 transaction_id = self.get_index_transaction_id() 957 if transaction_id is None: 958 logger.debug('No index transaction found, trying latest segment') 959 transaction_id = self.io.get_latest_segment() 960 if transaction_id is None: 961 report_error('This repository contains no valid data.') 962 return False 963 if repair: 964 self.io.cleanup(transaction_id) 965 segments_transaction_id = self.io.get_segments_transaction_id() 966 logger.debug('Segment transaction is %s', segments_transaction_id) 967 logger.debug('Determined transaction is %s', transaction_id) 968 self.prepare_txn(None) # self.index, self.compact, self.segments all empty now! 969 segment_count = sum(1 for _ in self.io.segment_iterator()) 970 logger.debug('Found %d segments', segment_count) 971 pi = ProgressIndicatorPercent(total=segment_count, msg='Checking segments %3.1f%%', step=0.1, 972 msgid='repository.check') 973 for i, (segment, filename) in enumerate(self.io.segment_iterator()): 974 pi.show(i) 975 if segment > transaction_id: 976 continue 977 logger.debug('checking segment file %s...', filename) 978 try: 979 objects = list(self.io.iter_objects(segment)) 980 except IntegrityError as err: 981 report_error(str(err)) 982 objects = [] 983 if repair: 984 self.io.recover_segment(segment, filename) 985 objects = list(self.io.iter_objects(segment)) 986 self._update_index(segment, objects, report_error) 987 pi.finish() 988 # self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id> 989 # We might need to add a commit tag if no committed segment is found 990 if repair and segments_transaction_id is None: 991 report_error('Adding commit tag to segment {}'.format(transaction_id)) 992 self.io.segment = transaction_id + 1 993 self.io.write_commit() 994 logger.info('Starting repository index check') 995 if current_index and not repair: 996 # current_index = "as found on disk" 997 # self.index = "as rebuilt in-memory from segments" 998 if len(current_index) != len(self.index): 999 report_error('Index object count mismatch.') 1000 logger.error('committed index: %d objects', len(current_index)) 1001 logger.error('rebuilt index: %d objects', len(self.index)) 1002 else: 1003 logger.info('Index object count match.') 1004 line_format = 'ID: %-64s rebuilt index: %-16s committed index: %-16s' 1005 not_found = '<not found>' 1006 for key, value in self.index.iteritems(): 1007 current_value = current_index.get(key, not_found) 1008 if current_value != value: 1009 logger.warning(line_format, bin_to_hex(key), value, current_value) 1010 for key, current_value in current_index.iteritems(): 1011 if key in self.index: 1012 continue 1013 value = self.index.get(key, not_found) 1014 if current_value != value: 1015 logger.warning(line_format, bin_to_hex(key), value, current_value) 1016 if repair: 1017 self.compact_segments() 1018 self.write_index() 1019 self.rollback() 1020 if error_found: 1021 if repair: 1022 logger.info('Completed repository check, errors found and repaired.') 1023 else: 1024 logger.error('Completed repository check, errors found.') 1025 else: 1026 logger.info('Completed repository check, no problems found.') 1027 return not error_found or repair 1028 1029 def scan_low_level(self): 1030 """Very low level scan over all segment file entries. 1031 1032 It does NOT care about what's committed and what not. 1033 It does NOT care whether an object might be deleted or superseded later. 1034 It just yields anything it finds in the segment files. 1035 1036 This is intended as a last-resort way to get access to all repo contents of damaged repos, 1037 when there is uncommitted, but valuable data in there... 1038 """ 1039 for segment, filename in self.io.segment_iterator(): 1040 try: 1041 for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): 1042 yield key, data, tag, segment, offset 1043 except IntegrityError as err: 1044 logger.error('Segment %d (%s) has IntegrityError(s) [%s] - skipping.' % (segment, filename, str(err))) 1045 1046 def _rollback(self, *, cleanup): 1047 """ 1048 """ 1049 if cleanup: 1050 self.io.cleanup(self.io.get_segments_transaction_id()) 1051 self.index = None 1052 self._active_txn = False 1053 self.transaction_doomed = None 1054 1055 def rollback(self): 1056 # note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time. 1057 self._rollback(cleanup=False) 1058 1059 def __len__(self): 1060 if not self.index: 1061 self.index = self.open_index(self.get_transaction_id()) 1062 return len(self.index) 1063 1064 def __contains__(self, id): 1065 if not self.index: 1066 self.index = self.open_index(self.get_transaction_id()) 1067 return id in self.index 1068 1069 def list(self, limit=None, marker=None): 1070 """ 1071 list <limit> IDs starting from after id <marker> - in index (pseudo-random) order. 1072 """ 1073 if not self.index: 1074 self.index = self.open_index(self.get_transaction_id()) 1075 return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)] 1076 1077 def scan(self, limit=None, marker=None): 1078 """ 1079 list <limit> IDs starting from after id <marker> - in on-disk order, so that a client 1080 fetching data in this order does linear reads and reuses stuff from disk cache. 1081 1082 We rely on repository.check() has run already (either now or some time before) and that: 1083 1084 - if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index. 1085 - if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync. 1086 - the repository segments are valid (no CRC errors). 1087 if we encounter CRC errors in segment entry headers, rest of segment is skipped. 1088 """ 1089 if limit is not None and limit < 1: 1090 raise ValueError('please use limit > 0 or limit = None') 1091 if not self.index: 1092 transaction_id = self.get_transaction_id() 1093 self.index = self.open_index(transaction_id) 1094 at_start = marker is None 1095 # smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8 1096 start_segment, start_offset = (0, 0) if at_start else self.index[marker] 1097 result = [] 1098 for segment, filename in self.io.segment_iterator(start_segment): 1099 obj_iterator = self.io.iter_objects(segment, start_offset, read_data=False, include_data=False) 1100 while True: 1101 try: 1102 tag, id, offset, size = next(obj_iterator) 1103 except (StopIteration, IntegrityError): 1104 # either end-of-segment or an error - we can not seek to objects at 1105 # higher offsets than one that has an error in the header fields 1106 break 1107 if start_offset > 0: 1108 # we are using a marker and the marker points to the last object we have already 1109 # returned in the previous scan() call - thus, we need to skip this one object. 1110 # also, for the next segment, we need to start at offset 0. 1111 start_offset = 0 1112 continue 1113 if tag == TAG_PUT and (segment, offset) == self.index.get(id): 1114 # we have found an existing and current object 1115 result.append(id) 1116 if len(result) == limit: 1117 return result 1118 return result 1119 1120 def get(self, id): 1121 if not self.index: 1122 self.index = self.open_index(self.get_transaction_id()) 1123 try: 1124 segment, offset = self.index[id] 1125 return self.io.read(segment, offset, id) 1126 except KeyError: 1127 raise self.ObjectNotFound(id, self.path) from None 1128 1129 def get_many(self, ids, is_preloaded=False): 1130 for id_ in ids: 1131 yield self.get(id_) 1132 1133 def put(self, id, data, wait=True): 1134 """put a repo object 1135 1136 Note: when doing calls with wait=False this gets async and caller must 1137 deal with async results / exceptions later. 1138 """ 1139 if not self._active_txn: 1140 self.prepare_txn(self.get_transaction_id()) 1141 try: 1142 segment, offset = self.index[id] 1143 except KeyError: 1144 pass 1145 else: 1146 # note: doing a delete first will do some bookkeeping. 1147 # we do not want to update the shadow_index here, because 1148 # we know already that we will PUT to this id, so it will 1149 # be in the repo index (and we won't need it in the shadow_index). 1150 self._delete(id, segment, offset, update_shadow_index=False) 1151 segment, offset = self.io.write_put(id, data) 1152 self.storage_quota_use += len(data) + self.io.put_header_fmt.size 1153 self.segments.setdefault(segment, 0) 1154 self.segments[segment] += 1 1155 self.index[id] = segment, offset 1156 if self.storage_quota and self.storage_quota_use > self.storage_quota: 1157 self.transaction_doomed = self.StorageQuotaExceeded( 1158 format_file_size(self.storage_quota), format_file_size(self.storage_quota_use)) 1159 raise self.transaction_doomed 1160 1161 def delete(self, id, wait=True): 1162 """delete a repo object 1163 1164 Note: when doing calls with wait=False this gets async and caller must 1165 deal with async results / exceptions later. 1166 """ 1167 if not self._active_txn: 1168 self.prepare_txn(self.get_transaction_id()) 1169 try: 1170 segment, offset = self.index.pop(id) 1171 except KeyError: 1172 raise self.ObjectNotFound(id, self.path) from None 1173 # if we get here, there is an object with this id in the repo, 1174 # we write a DEL here that shadows the respective PUT. 1175 # after the delete, the object is not in the repo index any more, 1176 # for the compaction code, we need to update the shadow_index in this case. 1177 self._delete(id, segment, offset, update_shadow_index=True) 1178 1179 def _delete(self, id, segment, offset, *, update_shadow_index): 1180 # common code used by put and delete 1181 if update_shadow_index: 1182 self.shadow_index.setdefault(id, []).append(segment) 1183 self.segments[segment] -= 1 1184 size = self.io.read(segment, offset, id, read_data=False) 1185 self.storage_quota_use -= size 1186 self.compact[segment] += size 1187 segment, size = self.io.write_delete(id) 1188 self.compact[segment] += size 1189 self.segments.setdefault(segment, 0) 1190 1191 def async_response(self, wait=True): 1192 """Get one async result (only applies to remote repositories). 1193 1194 async commands (== calls with wait=False, e.g. delete and put) have no results, 1195 but may raise exceptions. These async exceptions must get collected later via 1196 async_response() calls. Repeat the call until it returns None. 1197 The previous calls might either return one (non-None) result or raise an exception. 1198 If wait=True is given and there are outstanding responses, it will wait for them 1199 to arrive. With wait=False, it will only return already received responses. 1200 """ 1201 1202 def preload(self, ids): 1203 """Preload objects (only applies to remote repositories) 1204 """ 1205 1206 1207class LoggedIO: 1208 1209 class SegmentFull(Exception): 1210 """raised when a segment is full, before opening next""" 1211 1212 header_fmt = struct.Struct('<IIB') 1213 assert header_fmt.size == 9 1214 put_header_fmt = struct.Struct('<IIB32s') 1215 assert put_header_fmt.size == 41 1216 header_no_crc_fmt = struct.Struct('<IB') 1217 assert header_no_crc_fmt.size == 5 1218 crc_fmt = struct.Struct('<I') 1219 assert crc_fmt.size == 4 1220 1221 _commit = header_no_crc_fmt.pack(9, TAG_COMMIT) 1222 COMMIT = crc_fmt.pack(crc32(_commit)) + _commit 1223 1224 def __init__(self, path, limit, segments_per_dir, capacity=90): 1225 self.path = path 1226 self.fds = LRUCache(capacity, dispose=self._close_fd) 1227 self.segment = 0 1228 self.limit = limit 1229 self.segments_per_dir = segments_per_dir 1230 self.offset = 0 1231 self._write_fd = None 1232 self._fds_cleaned = 0 1233 1234 def close(self): 1235 self.close_segment() 1236 self.fds.clear() 1237 self.fds = None # Just to make sure we're disabled 1238 1239 def _close_fd(self, ts_fd): 1240 ts, fd = ts_fd 1241 safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED') 1242 fd.close() 1243 1244 def segment_iterator(self, segment=None, reverse=False): 1245 if segment is None: 1246 segment = 0 if not reverse else 2 ** 32 - 1 1247 data_path = os.path.join(self.path, 'data') 1248 start_segment_dir = segment // self.segments_per_dir 1249 dirs = os.listdir(data_path) 1250 if not reverse: 1251 dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir] 1252 else: 1253 dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir] 1254 dirs = sorted(dirs, key=int, reverse=reverse) 1255 for dir in dirs: 1256 filenames = os.listdir(os.path.join(data_path, dir)) 1257 if not reverse: 1258 filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment] 1259 else: 1260 filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment] 1261 filenames = sorted(filenames, key=int, reverse=reverse) 1262 for filename in filenames: 1263 # Note: Do not filter out logically deleted segments (see "File system interaction" above), 1264 # since this is used by cleanup and txn state detection as well. 1265 yield int(filename), os.path.join(data_path, dir, filename) 1266 1267 def get_latest_segment(self): 1268 for segment, filename in self.segment_iterator(reverse=True): 1269 return segment 1270 return None 1271 1272 def get_segments_transaction_id(self): 1273 """Return the last committed segment. 1274 """ 1275 for segment, filename in self.segment_iterator(reverse=True): 1276 if self.is_committed_segment(segment): 1277 return segment 1278 return None 1279 1280 def cleanup(self, transaction_id): 1281 """Delete segment files left by aborted transactions 1282 """ 1283 self.segment = transaction_id + 1 1284 count = 0 1285 for segment, filename in self.segment_iterator(reverse=True): 1286 if segment > transaction_id: 1287 if segment in self.fds: 1288 del self.fds[segment] 1289 truncate_and_unlink(filename) 1290 count += 1 1291 else: 1292 break 1293 logger.debug('Cleaned up %d uncommitted segment files (== everything after segment %d).', 1294 count, transaction_id) 1295 1296 def is_committed_segment(self, segment): 1297 """Check if segment ends with a COMMIT_TAG tag 1298 """ 1299 try: 1300 iterator = self.iter_objects(segment) 1301 except IntegrityError: 1302 return False 1303 with open(self.segment_filename(segment), 'rb') as fd: 1304 try: 1305 fd.seek(-self.header_fmt.size, os.SEEK_END) 1306 except OSError as e: 1307 # return False if segment file is empty or too small 1308 if e.errno == errno.EINVAL: 1309 return False 1310 raise e 1311 if fd.read(self.header_fmt.size) != self.COMMIT: 1312 return False 1313 seen_commit = False 1314 while True: 1315 try: 1316 tag, key, offset, _ = next(iterator) 1317 except IntegrityError: 1318 return False 1319 except StopIteration: 1320 break 1321 if tag == TAG_COMMIT: 1322 seen_commit = True 1323 continue 1324 if seen_commit: 1325 return False 1326 return seen_commit 1327 1328 def segment_filename(self, segment): 1329 return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment)) 1330 1331 def get_write_fd(self, no_new=False, raise_full=False): 1332 if not no_new and self.offset and self.offset > self.limit: 1333 if raise_full: 1334 raise self.SegmentFull 1335 self.close_segment() 1336 if not self._write_fd: 1337 if self.segment % self.segments_per_dir == 0: 1338 dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir)) 1339 if not os.path.exists(dirname): 1340 os.mkdir(dirname) 1341 sync_dir(os.path.join(self.path, 'data')) 1342 self._write_fd = SyncFile(self.segment_filename(self.segment), binary=True) 1343 self._write_fd.write(MAGIC) 1344 self.offset = MAGIC_LEN 1345 if self.segment in self.fds: 1346 # we may have a cached fd for a segment file we already deleted and 1347 # we are writing now a new segment file to same file name. get rid of 1348 # of the cached fd that still refers to the old file, so it will later 1349 # get repopulated (on demand) with a fd that refers to the new file. 1350 del self.fds[self.segment] 1351 return self._write_fd 1352 1353 def get_fd(self, segment): 1354 # note: get_fd() returns a fd with undefined file pointer position, 1355 # so callers must always seek() to desired position afterwards. 1356 now = time.monotonic() 1357 1358 def open_fd(): 1359 fd = open(self.segment_filename(segment), 'rb') 1360 self.fds[segment] = (now, fd) 1361 return fd 1362 1363 def clean_old(): 1364 # we regularly get rid of all old FDs here: 1365 if now - self._fds_cleaned > FD_MAX_AGE // 8: 1366 self._fds_cleaned = now 1367 for k, ts_fd in list(self.fds.items()): 1368 ts, fd = ts_fd 1369 if now - ts > FD_MAX_AGE: 1370 # we do not want to touch long-unused file handles to 1371 # avoid ESTALE issues (e.g. on network filesystems). 1372 del self.fds[k] 1373 1374 clean_old() 1375 try: 1376 ts, fd = self.fds[segment] 1377 except KeyError: 1378 fd = open_fd() 1379 else: 1380 # we only have fresh enough stuff here. 1381 # update the timestamp of the lru cache entry. 1382 self.fds.upd(segment, (now, fd)) 1383 return fd 1384 1385 def close_segment(self): 1386 # set self._write_fd to None early to guard against reentry from error handling code paths: 1387 fd, self._write_fd = self._write_fd, None 1388 if fd is not None: 1389 self.segment += 1 1390 self.offset = 0 1391 fd.close() 1392 1393 def delete_segment(self, segment): 1394 if segment in self.fds: 1395 del self.fds[segment] 1396 try: 1397 truncate_and_unlink(self.segment_filename(segment)) 1398 except FileNotFoundError: 1399 pass 1400 1401 def segment_exists(self, segment): 1402 filename = self.segment_filename(segment) 1403 # When deleting segments, they are first truncated. If truncate(2) and unlink(2) are split 1404 # across FS transactions, then logically deleted segments will show up as truncated. 1405 return os.path.exists(filename) and os.path.getsize(filename) 1406 1407 def segment_size(self, segment): 1408 return os.path.getsize(self.segment_filename(segment)) 1409 1410 def get_segment_magic(self, segment): 1411 fd = self.get_fd(segment) 1412 fd.seek(0) 1413 return fd.read(MAGIC_LEN) 1414 1415 def iter_objects(self, segment, offset=0, include_data=False, read_data=True): 1416 """ 1417 Return object iterator for *segment*. 1418 1419 If read_data is False then include_data must be False as well. 1420 Integrity checks are skipped: all data obtained from the iterator must be considered informational. 1421 1422 The iterator returns four-tuples of (tag, key, offset, data|size). 1423 """ 1424 fd = self.get_fd(segment) 1425 fd.seek(offset) 1426 if offset == 0: 1427 # we are touching this segment for the first time, check the MAGIC. 1428 # Repository.scan() calls us with segment > 0 when it continues an ongoing iteration 1429 # from a marker position - but then we have checked the magic before already. 1430 if fd.read(MAGIC_LEN) != MAGIC: 1431 raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0)) 1432 offset = MAGIC_LEN 1433 header = fd.read(self.header_fmt.size) 1434 while header: 1435 size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset, 1436 (TAG_PUT, TAG_DELETE, TAG_COMMIT), 1437 read_data=read_data) 1438 if include_data: 1439 yield tag, key, offset, data 1440 else: 1441 yield tag, key, offset, size 1442 offset += size 1443 # we must get the fd via get_fd() here again as we yielded to our caller and it might 1444 # have triggered closing of the fd we had before (e.g. by calling io.read() for 1445 # different segment(s)). 1446 # by calling get_fd() here again we also make our fd "recently used" so it likely 1447 # does not get kicked out of self.fds LRUcache. 1448 fd = self.get_fd(segment) 1449 fd.seek(offset) 1450 header = fd.read(self.header_fmt.size) 1451 1452 def recover_segment(self, segment, filename): 1453 logger.info('attempting to recover ' + filename) 1454 if segment in self.fds: 1455 del self.fds[segment] 1456 if os.path.getsize(filename) < MAGIC_LEN + self.header_fmt.size: 1457 # this is either a zero-byte file (which would crash mmap() below) or otherwise 1458 # just too small to be a valid non-empty segment file, so do a shortcut here: 1459 with SaveFile(filename, binary=True) as fd: 1460 fd.write(MAGIC) 1461 return 1462 with SaveFile(filename, binary=True) as dst_fd: 1463 with open(filename, 'rb') as src_fd: 1464 # note: file must not be 0 size or mmap() will crash. 1465 with mmap.mmap(src_fd.fileno(), 0, access=mmap.ACCESS_READ) as mm: 1466 # memoryview context manager is problematic, see https://bugs.python.org/issue35686 1467 data = memoryview(mm) 1468 d = data 1469 try: 1470 dst_fd.write(MAGIC) 1471 while len(d) >= self.header_fmt.size: 1472 crc, size, tag = self.header_fmt.unpack(d[:self.header_fmt.size]) 1473 if size < self.header_fmt.size or size > len(d): 1474 d = d[1:] 1475 continue 1476 if crc32(d[4:size]) & 0xffffffff != crc: 1477 d = d[1:] 1478 continue 1479 dst_fd.write(d[:size]) 1480 d = d[size:] 1481 finally: 1482 del d 1483 data.release() 1484 1485 def read(self, segment, offset, id, read_data=True): 1486 """ 1487 Read entry from *segment* at *offset* with *id*. 1488 1489 If read_data is False the size of the entry is returned instead and integrity checks are skipped. 1490 The return value should thus be considered informational. 1491 """ 1492 if segment == self.segment and self._write_fd: 1493 self._write_fd.sync() 1494 fd = self.get_fd(segment) 1495 fd.seek(offset) 1496 header = fd.read(self.put_header_fmt.size) 1497 size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data) 1498 if id != key: 1499 raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format( 1500 segment, offset)) 1501 return data if read_data else size 1502 1503 def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True): 1504 # some code shared by read() and iter_objects() 1505 try: 1506 hdr_tuple = fmt.unpack(header) 1507 except struct.error as err: 1508 raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format( 1509 segment, offset, err)) from None 1510 if fmt is self.put_header_fmt: 1511 crc, size, tag, key = hdr_tuple 1512 elif fmt is self.header_fmt: 1513 crc, size, tag = hdr_tuple 1514 key = None 1515 else: 1516 raise TypeError("_read called with unsupported format") 1517 if size > MAX_OBJECT_SIZE: 1518 # if you get this on an archive made with borg < 1.0.7 and millions of files and 1519 # you need to restore it, you can disable this check by using "if False:" above. 1520 raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format( 1521 size, segment, offset)) 1522 if size < fmt.size: 1523 raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format( 1524 size, segment, offset)) 1525 length = size - fmt.size 1526 if read_data: 1527 data = fd.read(length) 1528 if len(data) != length: 1529 raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format( 1530 segment, offset, length, len(data))) 1531 if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc: 1532 raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format( 1533 segment, offset)) 1534 if key is None and tag in (TAG_PUT, TAG_DELETE): 1535 key, data = data[:32], data[32:] 1536 else: 1537 if key is None and tag in (TAG_PUT, TAG_DELETE): 1538 key = fd.read(32) 1539 length -= 32 1540 if len(key) != 32: 1541 raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format( 1542 segment, offset, 32, len(key))) 1543 oldpos = fd.tell() 1544 seeked = fd.seek(length, os.SEEK_CUR) - oldpos 1545 data = None 1546 if seeked != length: 1547 raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format( 1548 segment, offset, length, seeked)) 1549 if tag not in acceptable_tags: 1550 raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format( 1551 segment, offset)) 1552 return size, tag, key, data 1553 1554 def write_put(self, id, data, raise_full=False): 1555 data_size = len(data) 1556 if data_size > MAX_DATA_SIZE: 1557 # this would push the segment entry size beyond MAX_OBJECT_SIZE. 1558 raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE)) 1559 fd = self.get_write_fd(raise_full=raise_full) 1560 size = data_size + self.put_header_fmt.size 1561 offset = self.offset 1562 header = self.header_no_crc_fmt.pack(size, TAG_PUT) 1563 crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff) 1564 fd.write(b''.join((crc, header, id, data))) 1565 self.offset += size 1566 return self.segment, offset 1567 1568 def write_delete(self, id, raise_full=False): 1569 fd = self.get_write_fd(raise_full=raise_full) 1570 header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE) 1571 crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff) 1572 fd.write(b''.join((crc, header, id))) 1573 self.offset += self.put_header_fmt.size 1574 return self.segment, self.put_header_fmt.size 1575 1576 def write_commit(self, intermediate=False): 1577 if intermediate: 1578 # Intermediate commits go directly into the current segment - this makes checking their validity more 1579 # expensive, but is faster and reduces clobber. 1580 fd = self.get_write_fd() 1581 fd.sync() 1582 else: 1583 self.close_segment() 1584 fd = self.get_write_fd() 1585 header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT) 1586 crc = self.crc_fmt.pack(crc32(header) & 0xffffffff) 1587 fd.write(b''.join((crc, header))) 1588 self.close_segment() 1589 return self.segment - 1 # close_segment() increments it 1590 1591 1592assert LoggedIO.put_header_fmt.size == 41 # see constants.MAX_OBJECT_SIZE 1593