1import io 2import logging 3import os 4import shutil 5import sys 6import tempfile 7from unittest.mock import patch 8 9import pytest 10 11from ..hashindex import NSIndex 12from ..helpers import Location 13from ..helpers import IntegrityError 14from ..helpers import msgpack 15from ..locking import Lock, LockFailed 16from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line 17from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT, TAG_COMMIT 18from . import BaseTestCase 19from .hashindex import H 20 21 22UNSPECIFIED = object() # for default values where we can't use None 23 24 25class RepositoryTestCaseBase(BaseTestCase): 26 key_size = 32 27 exclusive = True 28 29 def open(self, create=False, exclusive=UNSPECIFIED): 30 if exclusive is UNSPECIFIED: 31 exclusive = self.exclusive 32 return Repository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create) 33 34 def setUp(self): 35 self.tmppath = tempfile.mkdtemp() 36 self.repository = self.open(create=True) 37 self.repository.__enter__() 38 39 def tearDown(self): 40 self.repository.close() 41 shutil.rmtree(self.tmppath) 42 43 def reopen(self, exclusive=UNSPECIFIED): 44 if self.repository: 45 self.repository.close() 46 self.repository = self.open(exclusive=exclusive) 47 48 def add_keys(self): 49 self.repository.put(H(0), b'foo') 50 self.repository.put(H(1), b'bar') 51 self.repository.put(H(3), b'bar') 52 self.repository.commit() 53 self.repository.put(H(1), b'bar2') 54 self.repository.put(H(2), b'boo') 55 self.repository.delete(H(3)) 56 57 def repo_dump(self, label=None): 58 label = label + ': ' if label is not None else '' 59 H_trans = {H(i): i for i in range(10)} 60 H_trans[None] = -1 # key == None appears in commits 61 tag_trans = {TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'} 62 for segment, fn in self.repository.io.segment_iterator(): 63 for tag, key, offset, size in self.repository.io.iter_objects(segment): 64 print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size)) 65 print() 66 67 68class RepositoryTestCase(RepositoryTestCaseBase): 69 70 def test1(self): 71 for x in range(100): 72 self.repository.put(H(x), b'SOMEDATA') 73 key50 = H(50) 74 self.assert_equal(self.repository.get(key50), b'SOMEDATA') 75 self.repository.delete(key50) 76 self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50)) 77 self.repository.commit() 78 self.repository.close() 79 with self.open() as repository2: 80 self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50)) 81 for x in range(100): 82 if x == 50: 83 continue 84 self.assert_equal(repository2.get(H(x)), b'SOMEDATA') 85 86 def test2(self): 87 """Test multiple sequential transactions 88 """ 89 self.repository.put(H(0), b'foo') 90 self.repository.put(H(1), b'foo') 91 self.repository.commit() 92 self.repository.delete(H(0)) 93 self.repository.put(H(1), b'bar') 94 self.repository.commit() 95 self.assert_equal(self.repository.get(H(1)), b'bar') 96 97 def test_consistency(self): 98 """Test cache consistency 99 """ 100 self.repository.put(H(0), b'foo') 101 self.assert_equal(self.repository.get(H(0)), b'foo') 102 self.repository.put(H(0), b'foo2') 103 self.assert_equal(self.repository.get(H(0)), b'foo2') 104 self.repository.put(H(0), b'bar') 105 self.assert_equal(self.repository.get(H(0)), b'bar') 106 self.repository.delete(H(0)) 107 self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(H(0))) 108 109 def test_consistency2(self): 110 """Test cache consistency2 111 """ 112 self.repository.put(H(0), b'foo') 113 self.assert_equal(self.repository.get(H(0)), b'foo') 114 self.repository.commit() 115 self.repository.put(H(0), b'foo2') 116 self.assert_equal(self.repository.get(H(0)), b'foo2') 117 self.repository.rollback() 118 self.assert_equal(self.repository.get(H(0)), b'foo') 119 120 def test_overwrite_in_same_transaction(self): 121 """Test cache consistency2 122 """ 123 self.repository.put(H(0), b'foo') 124 self.repository.put(H(0), b'foo2') 125 self.repository.commit() 126 self.assert_equal(self.repository.get(H(0)), b'foo2') 127 128 def test_single_kind_transactions(self): 129 # put 130 self.repository.put(H(0), b'foo') 131 self.repository.commit() 132 self.repository.close() 133 # replace 134 self.repository = self.open() 135 with self.repository: 136 self.repository.put(H(0), b'bar') 137 self.repository.commit() 138 # delete 139 self.repository = self.open() 140 with self.repository: 141 self.repository.delete(H(0)) 142 self.repository.commit() 143 144 def test_list(self): 145 for x in range(100): 146 self.repository.put(H(x), b'SOMEDATA') 147 self.repository.commit() 148 all = self.repository.list() 149 self.assert_equal(len(all), 100) 150 first_half = self.repository.list(limit=50) 151 self.assert_equal(len(first_half), 50) 152 self.assert_equal(first_half, all[:50]) 153 second_half = self.repository.list(marker=first_half[-1]) 154 self.assert_equal(len(second_half), 50) 155 self.assert_equal(second_half, all[50:]) 156 self.assert_equal(len(self.repository.list(limit=50)), 50) 157 158 def test_scan(self): 159 for x in range(100): 160 self.repository.put(H(x), b'SOMEDATA') 161 self.repository.commit() 162 all = self.repository.scan() 163 assert len(all) == 100 164 first_half = self.repository.scan(limit=50) 165 assert len(first_half) == 50 166 assert first_half == all[:50] 167 second_half = self.repository.scan(marker=first_half[-1]) 168 assert len(second_half) == 50 169 assert second_half == all[50:] 170 assert len(self.repository.scan(limit=50)) == 50 171 # check result order == on-disk order (which is hash order) 172 for x in range(100): 173 assert all[x] == H(x) 174 175 def test_max_data_size(self): 176 max_data = b'x' * MAX_DATA_SIZE 177 self.repository.put(H(0), max_data) 178 self.assert_equal(self.repository.get(H(0)), max_data) 179 self.assert_raises(IntegrityError, 180 lambda: self.repository.put(H(1), max_data + b'x')) 181 182 183class LocalRepositoryTestCase(RepositoryTestCaseBase): 184 # test case that doesn't work with remote repositories 185 186 def _assert_sparse(self): 187 # The superseded 123456... PUT 188 assert self.repository.compact[0] == 41 + 9 189 # The DELETE issued by the superseding PUT (or issued directly) 190 assert self.repository.compact[2] == 41 191 self.repository._rebuild_sparse(0) 192 assert self.repository.compact[0] == 41 + 9 193 194 def test_sparse1(self): 195 self.repository.put(H(0), b'foo') 196 self.repository.put(H(1), b'123456789') 197 self.repository.commit() 198 self.repository.put(H(1), b'bar') 199 self._assert_sparse() 200 201 def test_sparse2(self): 202 self.repository.put(H(0), b'foo') 203 self.repository.put(H(1), b'123456789') 204 self.repository.commit() 205 self.repository.delete(H(1)) 206 self._assert_sparse() 207 208 def test_sparse_delete(self): 209 self.repository.put(H(0), b'1245') 210 self.repository.delete(H(0)) 211 self.repository.io._write_fd.sync() 212 213 # The on-line tracking works on a per-object basis... 214 assert self.repository.compact[0] == 41 + 41 + 4 215 self.repository._rebuild_sparse(0) 216 # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic) 217 assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC) 218 219 self.repository.commit() 220 assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()] 221 222 def test_uncommitted_garbage(self): 223 # uncommitted garbage should be no problem, it is cleaned up automatically. 224 # we just have to be careful with invalidation of cached FDs in LoggedIO. 225 self.repository.put(H(0), b'foo') 226 self.repository.commit() 227 # write some crap to a uncommitted segment file 228 last_segment = self.repository.io.get_latest_segment() 229 with open(self.repository.io.segment_filename(last_segment + 1), 'wb') as f: 230 f.write(MAGIC + b'crapcrapcrap') 231 self.repository.close() 232 # usually, opening the repo and starting a transaction should trigger a cleanup. 233 self.repository = self.open() 234 with self.repository: 235 self.repository.put(H(0), b'bar') # this may trigger compact_segments() 236 self.repository.commit() 237 # the point here is that nothing blows up with an exception. 238 239 240class RepositoryCommitTestCase(RepositoryTestCaseBase): 241 242 def test_replay_of_missing_index(self): 243 self.add_keys() 244 for name in os.listdir(self.repository.path): 245 if name.startswith('index.'): 246 os.unlink(os.path.join(self.repository.path, name)) 247 self.reopen() 248 with self.repository: 249 self.assert_equal(len(self.repository), 3) 250 self.assert_equal(self.repository.check(), True) 251 252 def test_crash_before_compact_segments(self): 253 self.add_keys() 254 self.repository.compact_segments = None 255 try: 256 self.repository.commit() 257 except TypeError: 258 pass 259 self.reopen() 260 with self.repository: 261 self.assert_equal(len(self.repository), 3) 262 self.assert_equal(self.repository.check(), True) 263 264 def test_crash_before_write_index(self): 265 self.add_keys() 266 self.repository.write_index = None 267 try: 268 self.repository.commit() 269 except TypeError: 270 pass 271 self.reopen() 272 with self.repository: 273 self.assert_equal(len(self.repository), 3) 274 self.assert_equal(self.repository.check(), True) 275 276 def test_replay_lock_upgrade_old(self): 277 self.add_keys() 278 for name in os.listdir(self.repository.path): 279 if name.startswith('index.'): 280 os.unlink(os.path.join(self.repository.path, name)) 281 with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade: 282 self.reopen(exclusive=None) # simulate old client that always does lock upgrades 283 with self.repository: 284 # the repo is only locked by a shared read lock, but to replay segments, 285 # we need an exclusive write lock - check if the lock gets upgraded. 286 self.assert_raises(LockFailed, lambda: len(self.repository)) 287 upgrade.assert_called_once_with() 288 289 def test_replay_lock_upgrade(self): 290 self.add_keys() 291 for name in os.listdir(self.repository.path): 292 if name.startswith('index.'): 293 os.unlink(os.path.join(self.repository.path, name)) 294 with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade: 295 self.reopen(exclusive=False) # current client usually does not do lock upgrade, except for replay 296 with self.repository: 297 # the repo is only locked by a shared read lock, but to replay segments, 298 # we need an exclusive write lock - check if the lock gets upgraded. 299 self.assert_raises(LockFailed, lambda: len(self.repository)) 300 upgrade.assert_called_once_with() 301 302 def test_crash_before_deleting_compacted_segments(self): 303 self.add_keys() 304 self.repository.io.delete_segment = None 305 try: 306 self.repository.commit() 307 except TypeError: 308 pass 309 self.reopen() 310 with self.repository: 311 self.assert_equal(len(self.repository), 3) 312 self.assert_equal(self.repository.check(), True) 313 self.assert_equal(len(self.repository), 3) 314 315 def test_ignores_commit_tag_in_data(self): 316 self.repository.put(H(0), LoggedIO.COMMIT) 317 self.reopen() 318 with self.repository: 319 io = self.repository.io 320 assert not io.is_committed_segment(io.get_latest_segment()) 321 322 def test_moved_deletes_are_tracked(self): 323 self.repository.put(H(1), b'1') 324 self.repository.put(H(2), b'2') 325 self.repository.commit() 326 self.repo_dump('p1 p2 cc') 327 self.repository.delete(H(1)) 328 self.repository.commit() 329 self.repo_dump('d1 cc') 330 last_segment = self.repository.io.get_latest_segment() - 1 331 num_deletes = 0 332 for tag, key, offset, size in self.repository.io.iter_objects(last_segment): 333 if tag == TAG_DELETE: 334 assert key == H(1) 335 num_deletes += 1 336 assert num_deletes == 1 337 assert last_segment in self.repository.compact 338 self.repository.put(H(3), b'3') 339 self.repository.commit() 340 self.repo_dump('p3 cc') 341 assert last_segment not in self.repository.compact 342 assert not self.repository.io.segment_exists(last_segment) 343 for segment, _ in self.repository.io.segment_iterator(): 344 for tag, key, offset, size in self.repository.io.iter_objects(segment): 345 assert tag != TAG_DELETE 346 assert key != H(1) 347 # after compaction, there should be no empty shadowed_segments lists left over. 348 # we have no put or del any more for H(1), so we lost knowledge about H(1). 349 assert H(1) not in self.repository.shadow_index 350 351 def test_shadowed_entries_are_preserved(self): 352 get_latest_segment = self.repository.io.get_latest_segment 353 self.repository.put(H(1), b'1') 354 # This is the segment with our original PUT of interest 355 put_segment = get_latest_segment() 356 self.repository.commit() 357 358 # We now delete H(1), and force this segment to not be compacted, which can happen 359 # if it's not sparse enough (symbolized by H(2) here). 360 self.repository.delete(H(1)) 361 self.repository.put(H(2), b'1') 362 delete_segment = get_latest_segment() 363 364 # We pretend these are mostly dense (not sparse) and won't be compacted 365 del self.repository.compact[put_segment] 366 del self.repository.compact[delete_segment] 367 368 self.repository.commit() 369 370 # Now we perform an unrelated operation on the segment containing the DELETE, 371 # causing it to be compacted. 372 self.repository.delete(H(2)) 373 self.repository.commit() 374 375 assert self.repository.io.segment_exists(put_segment) 376 assert not self.repository.io.segment_exists(delete_segment) 377 378 # Basic case, since the index survived this must be ok 379 assert H(1) not in self.repository 380 # Nuke index, force replay 381 os.unlink(os.path.join(self.repository.path, 'index.%d' % get_latest_segment())) 382 # Must not reappear 383 assert H(1) not in self.repository 384 385 def test_shadow_index_rollback(self): 386 self.repository.put(H(1), b'1') 387 self.repository.delete(H(1)) 388 assert self.repository.shadow_index[H(1)] == [0] 389 self.repository.commit() 390 self.repo_dump('p1 d1 cc') 391 # note how an empty list means that nothing is shadowed for sure 392 assert self.repository.shadow_index[H(1)] == [] # because the delete is considered unstable 393 self.repository.put(H(1), b'1') 394 self.repository.delete(H(1)) 395 self.repo_dump('p1 d1') 396 # 0 put/delete; 1 commit; 2 compacted; 3 commit; 4 put/delete 397 assert self.repository.shadow_index[H(1)] == [4] 398 self.repository.rollback() 399 self.repo_dump('r') 400 self.repository.put(H(2), b'1') 401 # After the rollback segment 4 shouldn't be considered anymore 402 assert self.repository.shadow_index[H(1)] == [] # because the delete is considered unstable 403 404 405class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase): 406 def open(self, create=False): 407 return Repository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True) 408 409 def test_destroy_append_only(self): 410 # Can't destroy append only repo (via the API) 411 with self.assert_raises(ValueError): 412 self.repository.destroy() 413 assert self.repository.append_only 414 415 def test_append_only(self): 416 def segments_in_repository(): 417 return len(list(self.repository.io.segment_iterator())) 418 self.repository.put(H(0), b'foo') 419 self.repository.commit() 420 421 self.repository.append_only = False 422 assert segments_in_repository() == 2 423 self.repository.put(H(0), b'foo') 424 self.repository.commit() 425 # normal: compact squashes the data together, only one segment 426 assert segments_in_repository() == 4 427 428 self.repository.append_only = True 429 assert segments_in_repository() == 4 430 self.repository.put(H(0), b'foo') 431 self.repository.commit() 432 # append only: does not compact, only new segments written 433 assert segments_in_repository() == 6 434 435 436class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase): 437 def test_additional_free_space(self): 438 self.add_keys() 439 self.repository.config.set('repository', 'additional_free_space', '1000T') 440 self.repository.save_key(b'shortcut to save_config') 441 self.reopen() 442 443 with self.repository: 444 self.repository.put(H(0), b'foobar') 445 with pytest.raises(Repository.InsufficientFreeSpaceError): 446 self.repository.commit() 447 assert os.path.exists(self.repository.path) 448 449 def test_create_free_space(self): 450 self.repository.additional_free_space = 1e20 451 with pytest.raises(Repository.InsufficientFreeSpaceError): 452 self.add_keys() 453 assert not os.path.exists(self.repository.path) 454 455 456class QuotaTestCase(RepositoryTestCaseBase): 457 def test_tracking(self): 458 assert self.repository.storage_quota_use == 0 459 self.repository.put(H(1), bytes(1234)) 460 assert self.repository.storage_quota_use == 1234 + 41 461 self.repository.put(H(2), bytes(5678)) 462 assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41 463 self.repository.delete(H(1)) 464 assert self.repository.storage_quota_use == 5678 + 41 465 self.repository.commit() 466 self.reopen() 467 with self.repository: 468 # Open new transaction; hints and thus quota data is not loaded unless needed. 469 self.repository.put(H(3), b'') 470 self.repository.delete(H(3)) 471 assert self.repository.storage_quota_use == 5678 + 41 472 473 def test_exceed_quota(self): 474 assert self.repository.storage_quota_use == 0 475 self.repository.storage_quota = 50 476 self.repository.put(H(1), b'') 477 assert self.repository.storage_quota_use == 41 478 self.repository.commit() 479 with pytest.raises(Repository.StorageQuotaExceeded): 480 self.repository.put(H(2), b'') 481 assert self.repository.storage_quota_use == 82 482 with pytest.raises(Repository.StorageQuotaExceeded): 483 self.repository.commit() 484 assert self.repository.storage_quota_use == 82 485 self.reopen() 486 with self.repository: 487 self.repository.storage_quota = 50 488 # Open new transaction; hints and thus quota data is not loaded unless needed. 489 self.repository.put(H(1), b'') 490 assert self.repository.storage_quota_use == 41 491 492 493class NonceReservation(RepositoryTestCaseBase): 494 def test_get_free_nonce_asserts(self): 495 self.reopen(exclusive=False) 496 with pytest.raises(AssertionError): 497 with self.repository: 498 self.repository.get_free_nonce() 499 500 def test_get_free_nonce(self): 501 with self.repository: 502 assert self.repository.get_free_nonce() is None 503 504 with open(os.path.join(self.repository.path, "nonce"), "w") as fd: 505 fd.write("0000000000000000") 506 assert self.repository.get_free_nonce() == 0 507 508 with open(os.path.join(self.repository.path, "nonce"), "w") as fd: 509 fd.write("5000000000000000") 510 assert self.repository.get_free_nonce() == 0x5000000000000000 511 512 def test_commit_nonce_reservation_asserts(self): 513 self.reopen(exclusive=False) 514 with pytest.raises(AssertionError): 515 with self.repository: 516 self.repository.commit_nonce_reservation(0x200, 0x100) 517 518 def test_commit_nonce_reservation(self): 519 with self.repository: 520 with pytest.raises(Exception): 521 self.repository.commit_nonce_reservation(0x200, 15) 522 523 self.repository.commit_nonce_reservation(0x200, None) 524 with open(os.path.join(self.repository.path, "nonce"), "r") as fd: 525 assert fd.read() == "0000000000000200" 526 527 with pytest.raises(Exception): 528 self.repository.commit_nonce_reservation(0x200, 15) 529 530 self.repository.commit_nonce_reservation(0x400, 0x200) 531 with open(os.path.join(self.repository.path, "nonce"), "r") as fd: 532 assert fd.read() == "0000000000000400" 533 534 535class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase): 536 def setUp(self): 537 super().setUp() 538 self.repository.put(H(0), b'foo') 539 self.repository.commit() 540 self.repository.close() 541 542 def do_commit(self): 543 with self.repository: 544 self.repository.put(H(0), b'fox') 545 self.repository.commit() 546 547 def test_corrupted_hints(self): 548 with open(os.path.join(self.repository.path, 'hints.1'), 'ab') as fd: 549 fd.write(b'123456789') 550 self.do_commit() 551 552 def test_deleted_hints(self): 553 os.unlink(os.path.join(self.repository.path, 'hints.1')) 554 self.do_commit() 555 556 def test_deleted_index(self): 557 os.unlink(os.path.join(self.repository.path, 'index.1')) 558 self.do_commit() 559 560 def test_unreadable_hints(self): 561 hints = os.path.join(self.repository.path, 'hints.1') 562 os.unlink(hints) 563 os.mkdir(hints) 564 with self.assert_raises(OSError): 565 self.do_commit() 566 567 def test_index(self): 568 with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd: 569 fd.write(b'123456789') 570 self.do_commit() 571 572 def test_index_outside_transaction(self): 573 with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd: 574 fd.write(b'123456789') 575 with self.repository: 576 assert len(self.repository) == 1 577 578 def _corrupt_index(self): 579 # HashIndex is able to detect incorrect headers and file lengths, 580 # but on its own it can't tell if the data is correct. 581 index_path = os.path.join(self.repository.path, 'index.1') 582 with open(index_path, 'r+b') as fd: 583 index_data = fd.read() 584 # Flip one bit in a key stored in the index 585 corrupted_key = (int.from_bytes(H(0), 'little') ^ 1).to_bytes(32, 'little') 586 corrupted_index_data = index_data.replace(H(0), corrupted_key) 587 assert corrupted_index_data != index_data 588 assert len(corrupted_index_data) == len(index_data) 589 fd.seek(0) 590 fd.write(corrupted_index_data) 591 592 def test_index_corrupted(self): 593 # HashIndex is able to detect incorrect headers and file lengths, 594 # but on its own it can't tell if the data itself is correct. 595 self._corrupt_index() 596 with self.repository: 597 # Data corruption is detected due to mismatching checksums 598 # and fixed by rebuilding the index. 599 assert len(self.repository) == 1 600 assert self.repository.get(H(0)) == b'foo' 601 602 def test_index_corrupted_without_integrity(self): 603 self._corrupt_index() 604 integrity_path = os.path.join(self.repository.path, 'integrity.1') 605 os.unlink(integrity_path) 606 with self.repository: 607 # Since the corrupted key is not noticed, the repository still thinks 608 # it contains one key... 609 assert len(self.repository) == 1 610 with pytest.raises(Repository.ObjectNotFound): 611 # ... but the real, uncorrupted key is not found in the corrupted index. 612 self.repository.get(H(0)) 613 614 def test_unreadable_index(self): 615 index = os.path.join(self.repository.path, 'index.1') 616 os.unlink(index) 617 os.mkdir(index) 618 with self.assert_raises(OSError): 619 self.do_commit() 620 621 def test_unknown_integrity_version(self): 622 # For now an unknown integrity data version is ignored and not an error. 623 integrity_path = os.path.join(self.repository.path, 'integrity.1') 624 with open(integrity_path, 'r+b') as fd: 625 msgpack.pack({ 626 # Borg only understands version 2 627 b'version': 4.7, 628 }, fd) 629 fd.truncate() 630 with self.repository: 631 # No issues accessing the repository 632 assert len(self.repository) == 1 633 assert self.repository.get(H(0)) == b'foo' 634 635 def _subtly_corrupted_hints_setup(self): 636 with self.repository: 637 self.repository.append_only = True 638 assert len(self.repository) == 1 639 assert self.repository.get(H(0)) == b'foo' 640 self.repository.put(H(1), b'bar') 641 self.repository.put(H(2), b'baz') 642 self.repository.commit() 643 self.repository.put(H(2), b'bazz') 644 self.repository.commit() 645 646 hints_path = os.path.join(self.repository.path, 'hints.5') 647 with open(hints_path, 'r+b') as fd: 648 hints = msgpack.unpack(fd) 649 fd.seek(0) 650 # Corrupt segment refcount 651 assert hints[b'segments'][2] == 1 652 hints[b'segments'][2] = 0 653 msgpack.pack(hints, fd) 654 fd.truncate() 655 656 def test_subtly_corrupted_hints(self): 657 self._subtly_corrupted_hints_setup() 658 with self.repository: 659 self.repository.append_only = False 660 self.repository.put(H(3), b'1234') 661 # Do a compaction run. Succeeds, since the failed checksum prompted a rebuild of the index+hints. 662 self.repository.commit() 663 664 assert len(self.repository) == 4 665 assert self.repository.get(H(0)) == b'foo' 666 assert self.repository.get(H(1)) == b'bar' 667 assert self.repository.get(H(2)) == b'bazz' 668 669 def test_subtly_corrupted_hints_without_integrity(self): 670 self._subtly_corrupted_hints_setup() 671 integrity_path = os.path.join(self.repository.path, 'integrity.5') 672 os.unlink(integrity_path) 673 with self.repository: 674 self.repository.append_only = False 675 self.repository.put(H(3), b'1234') 676 # Do a compaction run. Fails, since the corrupted refcount was not detected and leads to an assertion failure. 677 with pytest.raises(AssertionError) as exc_info: 678 self.repository.commit() 679 assert 'Corrupted segment reference count' in str(exc_info.value) 680 681 682class RepositoryCheckTestCase(RepositoryTestCaseBase): 683 684 def list_indices(self): 685 return [name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if name.startswith('index.')] 686 687 def check(self, repair=False, status=True): 688 self.assert_equal(self.repository.check(repair=repair), status) 689 # Make sure no tmp files are left behind 690 self.assert_equal([name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if 'tmp' in name], [], 'Found tmp files') 691 692 def get_objects(self, *ids): 693 for id_ in ids: 694 self.repository.get(H(id_)) 695 696 def add_objects(self, segments): 697 for ids in segments: 698 for id_ in ids: 699 self.repository.put(H(id_), b'data') 700 self.repository.commit() 701 702 def get_head(self): 703 return sorted(int(n) for n in os.listdir(os.path.join(self.tmppath, 'repository', 'data', '0')) if n.isdigit())[-1] 704 705 def open_index(self): 706 return NSIndex.read(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head()))) 707 708 def corrupt_object(self, id_): 709 idx = self.open_index() 710 segment, offset = idx[H(id_)] 711 with open(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment)), 'r+b') as fd: 712 fd.seek(offset) 713 fd.write(b'BOOM') 714 715 def delete_segment(self, segment): 716 os.unlink(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment))) 717 718 def delete_index(self): 719 os.unlink(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head()))) 720 721 def rename_index(self, new_name): 722 os.rename(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head())), 723 os.path.join(self.tmppath, 'repository', new_name)) 724 725 def list_objects(self): 726 return set(int(key) for key in self.repository.list()) 727 728 def test_repair_corrupted_segment(self): 729 self.add_objects([[1, 2, 3], [4, 5], [6]]) 730 self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) 731 self.check(status=True) 732 self.corrupt_object(5) 733 self.assert_raises(IntegrityError, lambda: self.get_objects(5)) 734 self.repository.rollback() 735 # Make sure a regular check does not repair anything 736 self.check(status=False) 737 self.check(status=False) 738 # Make sure a repair actually repairs the repo 739 self.check(repair=True, status=True) 740 self.get_objects(4) 741 self.check(status=True) 742 self.assert_equal(set([1, 2, 3, 4, 6]), self.list_objects()) 743 744 def test_repair_missing_segment(self): 745 self.add_objects([[1, 2, 3], [4, 5, 6]]) 746 self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) 747 self.check(status=True) 748 self.delete_segment(2) 749 self.repository.rollback() 750 self.check(repair=True, status=True) 751 self.assert_equal(set([1, 2, 3]), self.list_objects()) 752 753 def test_repair_missing_commit_segment(self): 754 self.add_objects([[1, 2, 3], [4, 5, 6]]) 755 self.delete_segment(3) 756 self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4)) 757 self.assert_equal(set([1, 2, 3]), self.list_objects()) 758 759 def test_repair_corrupted_commit_segment(self): 760 self.add_objects([[1, 2, 3], [4, 5, 6]]) 761 with open(os.path.join(self.tmppath, 'repository', 'data', '0', '3'), 'r+b') as fd: 762 fd.seek(-1, os.SEEK_END) 763 fd.write(b'X') 764 self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4)) 765 self.check(status=True) 766 self.get_objects(3) 767 self.assert_equal(set([1, 2, 3]), self.list_objects()) 768 769 def test_repair_no_commits(self): 770 self.add_objects([[1, 2, 3]]) 771 with open(os.path.join(self.tmppath, 'repository', 'data', '0', '1'), 'r+b') as fd: 772 fd.seek(-1, os.SEEK_END) 773 fd.write(b'X') 774 self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4)) 775 self.check(status=False) 776 self.check(status=False) 777 self.assert_equal(self.list_indices(), ['index.1']) 778 self.check(repair=True, status=True) 779 self.assert_equal(self.list_indices(), ['index.3']) 780 self.check(status=True) 781 self.get_objects(3) 782 self.assert_equal(set([1, 2, 3]), self.list_objects()) 783 784 def test_repair_missing_index(self): 785 self.add_objects([[1, 2, 3], [4, 5, 6]]) 786 self.delete_index() 787 self.check(status=True) 788 self.get_objects(4) 789 self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) 790 791 def test_repair_index_too_new(self): 792 self.add_objects([[1, 2, 3], [4, 5, 6]]) 793 self.assert_equal(self.list_indices(), ['index.3']) 794 self.rename_index('index.100') 795 self.check(status=True) 796 self.assert_equal(self.list_indices(), ['index.3']) 797 self.get_objects(4) 798 self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects()) 799 800 def test_crash_before_compact(self): 801 self.repository.put(H(0), b'data') 802 self.repository.put(H(0), b'data2') 803 # Simulate a crash before compact 804 with patch.object(Repository, 'compact_segments') as compact: 805 self.repository.commit() 806 compact.assert_called_once_with() 807 self.reopen() 808 with self.repository: 809 self.check(repair=True) 810 self.assert_equal(self.repository.get(H(0)), b'data2') 811 812 813class RepositoryHintsTestCase(RepositoryTestCaseBase): 814 815 def test_hints_behaviour(self): 816 self.repository.put(H(0), b'data') 817 self.assert_equal(self.repository.shadow_index, {}) 818 self.assert_true(len(self.repository.compact) == 0) 819 self.repository.delete(H(0)) 820 self.repository.commit() 821 # this is to make the previous delete "stable", see delete_is_not_stable: 822 self.repository.put(H(1), b'data') 823 self.repository.commit() 824 # in 1.1-maint, commit() always implicitly compacts. 825 # nothing to compact any more! no info left about stuff that does not exist any more: 826 self.assert_not_in(H(0), self.repository.shadow_index) 827 # segment 0 was compacted away, no info about it left: 828 self.assert_not_in(0, self.repository.compact) 829 self.assert_not_in(0, self.repository.segments) 830 831 832class RemoteRepositoryTestCase(RepositoryTestCase): 833 repository = None # type: RemoteRepository 834 835 def open(self, create=False): 836 return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), 837 exclusive=True, create=create) 838 839 def _get_mock_args(self): 840 class MockArgs: 841 remote_path = 'borg' 842 umask = 0o077 843 debug_topics = [] 844 rsh = None 845 846 def __contains__(self, item): 847 # To behave like argparse.Namespace 848 return hasattr(self, item) 849 850 return MockArgs() 851 852 def test_invalid_rpc(self): 853 self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', {})) 854 855 def test_rpc_exception_transport(self): 856 s1 = 'test string' 857 858 try: 859 self.repository.call('inject_exception', {'kind': 'DoesNotExist'}) 860 except Repository.DoesNotExist as e: 861 assert len(e.args) == 1 862 assert e.args[0] == self.repository.location.orig 863 864 try: 865 self.repository.call('inject_exception', {'kind': 'AlreadyExists'}) 866 except Repository.AlreadyExists as e: 867 assert len(e.args) == 1 868 assert e.args[0] == self.repository.location.orig 869 870 try: 871 self.repository.call('inject_exception', {'kind': 'CheckNeeded'}) 872 except Repository.CheckNeeded as e: 873 assert len(e.args) == 1 874 assert e.args[0] == self.repository.location.orig 875 876 try: 877 self.repository.call('inject_exception', {'kind': 'IntegrityError'}) 878 except IntegrityError as e: 879 assert len(e.args) == 1 880 assert e.args[0] == s1 881 882 try: 883 self.repository.call('inject_exception', {'kind': 'PathNotAllowed'}) 884 except PathNotAllowed as e: 885 assert len(e.args) == 1 886 assert e.args[0] == 'foo' 887 888 try: 889 self.repository.call('inject_exception', {'kind': 'ObjectNotFound'}) 890 except Repository.ObjectNotFound as e: 891 assert len(e.args) == 2 892 assert e.args[0] == s1 893 assert e.args[1] == self.repository.location.orig 894 895 try: 896 self.repository.call('inject_exception', {'kind': 'InvalidRPCMethod'}) 897 except InvalidRPCMethod as e: 898 assert len(e.args) == 1 899 assert e.args[0] == s1 900 901 try: 902 self.repository.call('inject_exception', {'kind': 'divide'}) 903 except RemoteRepository.RPCError as e: 904 assert e.unpacked 905 assert e.get_message() == 'ZeroDivisionError: integer division or modulo by zero\n' 906 assert e.exception_class == 'ZeroDivisionError' 907 assert len(e.exception_full) > 0 908 909 def test_ssh_cmd(self): 910 args = self._get_mock_args() 911 self.repository._args = args 912 assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com'] 913 assert self.repository.ssh_cmd(Location('ssh://example.com/foo')) == ['ssh', 'example.com'] 914 assert self.repository.ssh_cmd(Location('ssh://user@example.com/foo')) == ['ssh', 'user@example.com'] 915 assert self.repository.ssh_cmd(Location('ssh://user@example.com:1234/foo')) == ['ssh', '-p', '1234', 'user@example.com'] 916 os.environ['BORG_RSH'] = 'ssh --foo' 917 assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '--foo', 'example.com'] 918 919 def test_borg_cmd(self): 920 assert self.repository.borg_cmd(None, testing=True) == [sys.executable, '-m', 'borg.archiver', 'serve'] 921 args = self._get_mock_args() 922 # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown: 923 logging.getLogger().setLevel(logging.INFO) 924 # note: test logger is on info log level, so --info gets added automagically 925 assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info'] 926 args.remote_path = 'borg-0.28.2' 927 assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info'] 928 args.debug_topics = ['something_client_side', 'repository_compaction'] 929 assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info', 930 '--debug-topic=borg.debug.repository_compaction'] 931 args = self._get_mock_args() 932 args.storage_quota = 0 933 assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info'] 934 args.storage_quota = 314159265 935 assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info', 936 '--storage-quota=314159265'] 937 args.rsh = 'ssh -i foo' 938 self.repository._args = args 939 assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '-i', 'foo', 'example.com'] 940 941 942class RemoteLegacyFree(RepositoryTestCaseBase): 943 # Keep testing this so we can someday safely remove the legacy tuple format. 944 945 def open(self, create=False): 946 with patch.object(RemoteRepository, 'dictFormat', True): 947 return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), 948 exclusive=True, create=create) 949 950 def test_legacy_free(self): 951 # put 952 self.repository.put(H(0), b'foo') 953 self.repository.commit() 954 self.repository.close() 955 # replace 956 self.repository = self.open() 957 with self.repository: 958 self.repository.put(H(0), b'bar') 959 self.repository.commit() 960 # delete 961 self.repository = self.open() 962 with self.repository: 963 self.repository.delete(H(0)) 964 self.repository.commit() 965 966 967class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase): 968 969 def open(self, create=False): 970 return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), 971 exclusive=True, create=create) 972 973 def test_crash_before_compact(self): 974 # skip this test, we can't mock-patch a Repository class in another process! 975 pass 976 977 978class RemoteLoggerTestCase(BaseTestCase): 979 def setUp(self): 980 self.stream = io.StringIO() 981 self.handler = logging.StreamHandler(self.stream) 982 logging.getLogger().handlers[:] = [self.handler] 983 logging.getLogger('borg.repository').handlers[:] = [] 984 logging.getLogger('borg.repository.foo').handlers[:] = [] 985 # capture stderr 986 sys.stderr.flush() 987 self.old_stderr = sys.stderr 988 self.stderr = sys.stderr = io.StringIO() 989 990 def tearDown(self): 991 sys.stderr = self.old_stderr 992 993 def test_stderr_messages(self): 994 handle_remote_line("unstructured stderr message\n") 995 self.assert_equal(self.stream.getvalue(), '') 996 # stderr messages don't get an implicit newline 997 self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr message\n') 998 999 def test_stderr_progress_messages(self): 1000 handle_remote_line("unstructured stderr progress message\r") 1001 self.assert_equal(self.stream.getvalue(), '') 1002 # stderr messages don't get an implicit newline 1003 self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr progress message\r') 1004 1005 def test_pre11_format_messages(self): 1006 self.handler.setLevel(logging.DEBUG) 1007 logging.getLogger().setLevel(logging.DEBUG) 1008 1009 handle_remote_line("$LOG INFO Remote: borg < 1.1 format message\n") 1010 self.assert_equal(self.stream.getvalue(), 'Remote: borg < 1.1 format message\n') 1011 self.assert_equal(self.stderr.getvalue(), '') 1012 1013 def test_post11_format_messages(self): 1014 self.handler.setLevel(logging.DEBUG) 1015 logging.getLogger().setLevel(logging.DEBUG) 1016 1017 handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message\n") 1018 self.assert_equal(self.stream.getvalue(), 'Remote: borg >= 1.1 format message\n') 1019 self.assert_equal(self.stderr.getvalue(), '') 1020 1021 def test_remote_messages_screened(self): 1022 # default borg config for root logger 1023 self.handler.setLevel(logging.WARNING) 1024 logging.getLogger().setLevel(logging.WARNING) 1025 1026 handle_remote_line("$LOG INFO borg.repository Remote: new format info message\n") 1027 self.assert_equal(self.stream.getvalue(), '') 1028 self.assert_equal(self.stderr.getvalue(), '') 1029 1030 def test_info_to_correct_local_child(self): 1031 logging.getLogger('borg.repository').setLevel(logging.INFO) 1032 logging.getLogger('borg.repository.foo').setLevel(logging.INFO) 1033 # default borg config for root logger 1034 self.handler.setLevel(logging.WARNING) 1035 logging.getLogger().setLevel(logging.WARNING) 1036 1037 child_stream = io.StringIO() 1038 child_handler = logging.StreamHandler(child_stream) 1039 child_handler.setLevel(logging.INFO) 1040 logging.getLogger('borg.repository').handlers[:] = [child_handler] 1041 foo_stream = io.StringIO() 1042 foo_handler = logging.StreamHandler(foo_stream) 1043 foo_handler.setLevel(logging.INFO) 1044 logging.getLogger('borg.repository.foo').handlers[:] = [foo_handler] 1045 1046 handle_remote_line("$LOG INFO borg.repository Remote: new format child message\n") 1047 self.assert_equal(foo_stream.getvalue(), '') 1048 self.assert_equal(child_stream.getvalue(), 'Remote: new format child message\n') 1049 self.assert_equal(self.stream.getvalue(), '') 1050 self.assert_equal(self.stderr.getvalue(), '') 1051