1import io
2import logging
3import os
4import shutil
5import sys
6import tempfile
7from unittest.mock import patch
8
9import pytest
10
11from ..hashindex import NSIndex
12from ..helpers import Location
13from ..helpers import IntegrityError
14from ..helpers import msgpack
15from ..locking import Lock, LockFailed
16from ..remote import RemoteRepository, InvalidRPCMethod, PathNotAllowed, ConnectionClosedWithHint, handle_remote_line
17from ..repository import Repository, LoggedIO, MAGIC, MAX_DATA_SIZE, TAG_DELETE, TAG_PUT, TAG_COMMIT
18from . import BaseTestCase
19from .hashindex import H
20
21
22UNSPECIFIED = object()  # for default values where we can't use None
23
24
25class RepositoryTestCaseBase(BaseTestCase):
26    key_size = 32
27    exclusive = True
28
29    def open(self, create=False, exclusive=UNSPECIFIED):
30        if exclusive is UNSPECIFIED:
31            exclusive = self.exclusive
32        return Repository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create)
33
34    def setUp(self):
35        self.tmppath = tempfile.mkdtemp()
36        self.repository = self.open(create=True)
37        self.repository.__enter__()
38
39    def tearDown(self):
40        self.repository.close()
41        shutil.rmtree(self.tmppath)
42
43    def reopen(self, exclusive=UNSPECIFIED):
44        if self.repository:
45            self.repository.close()
46        self.repository = self.open(exclusive=exclusive)
47
48    def add_keys(self):
49        self.repository.put(H(0), b'foo')
50        self.repository.put(H(1), b'bar')
51        self.repository.put(H(3), b'bar')
52        self.repository.commit()
53        self.repository.put(H(1), b'bar2')
54        self.repository.put(H(2), b'boo')
55        self.repository.delete(H(3))
56
57    def repo_dump(self, label=None):
58        label = label + ': ' if label is not None else ''
59        H_trans = {H(i): i for i in range(10)}
60        H_trans[None] = -1  # key == None appears in commits
61        tag_trans = {TAG_PUT: 'put', TAG_DELETE: 'del', TAG_COMMIT: 'comm'}
62        for segment, fn in self.repository.io.segment_iterator():
63            for tag, key, offset, size in self.repository.io.iter_objects(segment):
64                print("%s%s H(%d) -> %s[%d..+%d]" % (label, tag_trans[tag], H_trans[key], fn, offset, size))
65        print()
66
67
68class RepositoryTestCase(RepositoryTestCaseBase):
69
70    def test1(self):
71        for x in range(100):
72            self.repository.put(H(x), b'SOMEDATA')
73        key50 = H(50)
74        self.assert_equal(self.repository.get(key50), b'SOMEDATA')
75        self.repository.delete(key50)
76        self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(key50))
77        self.repository.commit()
78        self.repository.close()
79        with self.open() as repository2:
80            self.assert_raises(Repository.ObjectNotFound, lambda: repository2.get(key50))
81            for x in range(100):
82                if x == 50:
83                    continue
84                self.assert_equal(repository2.get(H(x)), b'SOMEDATA')
85
86    def test2(self):
87        """Test multiple sequential transactions
88        """
89        self.repository.put(H(0), b'foo')
90        self.repository.put(H(1), b'foo')
91        self.repository.commit()
92        self.repository.delete(H(0))
93        self.repository.put(H(1), b'bar')
94        self.repository.commit()
95        self.assert_equal(self.repository.get(H(1)), b'bar')
96
97    def test_consistency(self):
98        """Test cache consistency
99        """
100        self.repository.put(H(0), b'foo')
101        self.assert_equal(self.repository.get(H(0)), b'foo')
102        self.repository.put(H(0), b'foo2')
103        self.assert_equal(self.repository.get(H(0)), b'foo2')
104        self.repository.put(H(0), b'bar')
105        self.assert_equal(self.repository.get(H(0)), b'bar')
106        self.repository.delete(H(0))
107        self.assert_raises(Repository.ObjectNotFound, lambda: self.repository.get(H(0)))
108
109    def test_consistency2(self):
110        """Test cache consistency2
111        """
112        self.repository.put(H(0), b'foo')
113        self.assert_equal(self.repository.get(H(0)), b'foo')
114        self.repository.commit()
115        self.repository.put(H(0), b'foo2')
116        self.assert_equal(self.repository.get(H(0)), b'foo2')
117        self.repository.rollback()
118        self.assert_equal(self.repository.get(H(0)), b'foo')
119
120    def test_overwrite_in_same_transaction(self):
121        """Test cache consistency2
122        """
123        self.repository.put(H(0), b'foo')
124        self.repository.put(H(0), b'foo2')
125        self.repository.commit()
126        self.assert_equal(self.repository.get(H(0)), b'foo2')
127
128    def test_single_kind_transactions(self):
129        # put
130        self.repository.put(H(0), b'foo')
131        self.repository.commit()
132        self.repository.close()
133        # replace
134        self.repository = self.open()
135        with self.repository:
136            self.repository.put(H(0), b'bar')
137            self.repository.commit()
138        # delete
139        self.repository = self.open()
140        with self.repository:
141            self.repository.delete(H(0))
142            self.repository.commit()
143
144    def test_list(self):
145        for x in range(100):
146            self.repository.put(H(x), b'SOMEDATA')
147        self.repository.commit()
148        all = self.repository.list()
149        self.assert_equal(len(all), 100)
150        first_half = self.repository.list(limit=50)
151        self.assert_equal(len(first_half), 50)
152        self.assert_equal(first_half, all[:50])
153        second_half = self.repository.list(marker=first_half[-1])
154        self.assert_equal(len(second_half), 50)
155        self.assert_equal(second_half, all[50:])
156        self.assert_equal(len(self.repository.list(limit=50)), 50)
157
158    def test_scan(self):
159        for x in range(100):
160            self.repository.put(H(x), b'SOMEDATA')
161        self.repository.commit()
162        all = self.repository.scan()
163        assert len(all) == 100
164        first_half = self.repository.scan(limit=50)
165        assert len(first_half) == 50
166        assert first_half == all[:50]
167        second_half = self.repository.scan(marker=first_half[-1])
168        assert len(second_half) == 50
169        assert second_half == all[50:]
170        assert len(self.repository.scan(limit=50)) == 50
171        # check result order == on-disk order (which is hash order)
172        for x in range(100):
173            assert all[x] == H(x)
174
175    def test_max_data_size(self):
176        max_data = b'x' * MAX_DATA_SIZE
177        self.repository.put(H(0), max_data)
178        self.assert_equal(self.repository.get(H(0)), max_data)
179        self.assert_raises(IntegrityError,
180                           lambda: self.repository.put(H(1), max_data + b'x'))
181
182
183class LocalRepositoryTestCase(RepositoryTestCaseBase):
184    # test case that doesn't work with remote repositories
185
186    def _assert_sparse(self):
187        # The superseded 123456... PUT
188        assert self.repository.compact[0] == 41 + 9
189        # The DELETE issued by the superseding PUT (or issued directly)
190        assert self.repository.compact[2] == 41
191        self.repository._rebuild_sparse(0)
192        assert self.repository.compact[0] == 41 + 9
193
194    def test_sparse1(self):
195        self.repository.put(H(0), b'foo')
196        self.repository.put(H(1), b'123456789')
197        self.repository.commit()
198        self.repository.put(H(1), b'bar')
199        self._assert_sparse()
200
201    def test_sparse2(self):
202        self.repository.put(H(0), b'foo')
203        self.repository.put(H(1), b'123456789')
204        self.repository.commit()
205        self.repository.delete(H(1))
206        self._assert_sparse()
207
208    def test_sparse_delete(self):
209        self.repository.put(H(0), b'1245')
210        self.repository.delete(H(0))
211        self.repository.io._write_fd.sync()
212
213        # The on-line tracking works on a per-object basis...
214        assert self.repository.compact[0] == 41 + 41 + 4
215        self.repository._rebuild_sparse(0)
216        # ...while _rebuild_sparse can mark whole segments as completely sparse (which then includes the segment magic)
217        assert self.repository.compact[0] == 41 + 41 + 4 + len(MAGIC)
218
219        self.repository.commit()
220        assert 0 not in [segment for segment, _ in self.repository.io.segment_iterator()]
221
222    def test_uncommitted_garbage(self):
223        # uncommitted garbage should be no problem, it is cleaned up automatically.
224        # we just have to be careful with invalidation of cached FDs in LoggedIO.
225        self.repository.put(H(0), b'foo')
226        self.repository.commit()
227        # write some crap to a uncommitted segment file
228        last_segment = self.repository.io.get_latest_segment()
229        with open(self.repository.io.segment_filename(last_segment + 1), 'wb') as f:
230            f.write(MAGIC + b'crapcrapcrap')
231        self.repository.close()
232        # usually, opening the repo and starting a transaction should trigger a cleanup.
233        self.repository = self.open()
234        with self.repository:
235            self.repository.put(H(0), b'bar')  # this may trigger compact_segments()
236            self.repository.commit()
237        # the point here is that nothing blows up with an exception.
238
239
240class RepositoryCommitTestCase(RepositoryTestCaseBase):
241
242    def test_replay_of_missing_index(self):
243        self.add_keys()
244        for name in os.listdir(self.repository.path):
245            if name.startswith('index.'):
246                os.unlink(os.path.join(self.repository.path, name))
247        self.reopen()
248        with self.repository:
249            self.assert_equal(len(self.repository), 3)
250            self.assert_equal(self.repository.check(), True)
251
252    def test_crash_before_compact_segments(self):
253        self.add_keys()
254        self.repository.compact_segments = None
255        try:
256            self.repository.commit()
257        except TypeError:
258            pass
259        self.reopen()
260        with self.repository:
261            self.assert_equal(len(self.repository), 3)
262            self.assert_equal(self.repository.check(), True)
263
264    def test_crash_before_write_index(self):
265        self.add_keys()
266        self.repository.write_index = None
267        try:
268            self.repository.commit()
269        except TypeError:
270            pass
271        self.reopen()
272        with self.repository:
273            self.assert_equal(len(self.repository), 3)
274            self.assert_equal(self.repository.check(), True)
275
276    def test_replay_lock_upgrade_old(self):
277        self.add_keys()
278        for name in os.listdir(self.repository.path):
279            if name.startswith('index.'):
280                os.unlink(os.path.join(self.repository.path, name))
281        with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade:
282            self.reopen(exclusive=None)  # simulate old client that always does lock upgrades
283            with self.repository:
284                # the repo is only locked by a shared read lock, but to replay segments,
285                # we need an exclusive write lock - check if the lock gets upgraded.
286                self.assert_raises(LockFailed, lambda: len(self.repository))
287                upgrade.assert_called_once_with()
288
289    def test_replay_lock_upgrade(self):
290        self.add_keys()
291        for name in os.listdir(self.repository.path):
292            if name.startswith('index.'):
293                os.unlink(os.path.join(self.repository.path, name))
294        with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade:
295            self.reopen(exclusive=False)  # current client usually does not do lock upgrade, except for replay
296            with self.repository:
297                # the repo is only locked by a shared read lock, but to replay segments,
298                # we need an exclusive write lock - check if the lock gets upgraded.
299                self.assert_raises(LockFailed, lambda: len(self.repository))
300                upgrade.assert_called_once_with()
301
302    def test_crash_before_deleting_compacted_segments(self):
303        self.add_keys()
304        self.repository.io.delete_segment = None
305        try:
306            self.repository.commit()
307        except TypeError:
308            pass
309        self.reopen()
310        with self.repository:
311            self.assert_equal(len(self.repository), 3)
312            self.assert_equal(self.repository.check(), True)
313            self.assert_equal(len(self.repository), 3)
314
315    def test_ignores_commit_tag_in_data(self):
316        self.repository.put(H(0), LoggedIO.COMMIT)
317        self.reopen()
318        with self.repository:
319            io = self.repository.io
320            assert not io.is_committed_segment(io.get_latest_segment())
321
322    def test_moved_deletes_are_tracked(self):
323        self.repository.put(H(1), b'1')
324        self.repository.put(H(2), b'2')
325        self.repository.commit()
326        self.repo_dump('p1 p2 cc')
327        self.repository.delete(H(1))
328        self.repository.commit()
329        self.repo_dump('d1 cc')
330        last_segment = self.repository.io.get_latest_segment() - 1
331        num_deletes = 0
332        for tag, key, offset, size in self.repository.io.iter_objects(last_segment):
333            if tag == TAG_DELETE:
334                assert key == H(1)
335                num_deletes += 1
336        assert num_deletes == 1
337        assert last_segment in self.repository.compact
338        self.repository.put(H(3), b'3')
339        self.repository.commit()
340        self.repo_dump('p3 cc')
341        assert last_segment not in self.repository.compact
342        assert not self.repository.io.segment_exists(last_segment)
343        for segment, _ in self.repository.io.segment_iterator():
344            for tag, key, offset, size in self.repository.io.iter_objects(segment):
345                assert tag != TAG_DELETE
346                assert key != H(1)
347        # after compaction, there should be no empty shadowed_segments lists left over.
348        # we have no put or del any more for H(1), so we lost knowledge about H(1).
349        assert H(1) not in self.repository.shadow_index
350
351    def test_shadowed_entries_are_preserved(self):
352        get_latest_segment = self.repository.io.get_latest_segment
353        self.repository.put(H(1), b'1')
354        # This is the segment with our original PUT of interest
355        put_segment = get_latest_segment()
356        self.repository.commit()
357
358        # We now delete H(1), and force this segment to not be compacted, which can happen
359        # if it's not sparse enough (symbolized by H(2) here).
360        self.repository.delete(H(1))
361        self.repository.put(H(2), b'1')
362        delete_segment = get_latest_segment()
363
364        # We pretend these are mostly dense (not sparse) and won't be compacted
365        del self.repository.compact[put_segment]
366        del self.repository.compact[delete_segment]
367
368        self.repository.commit()
369
370        # Now we perform an unrelated operation on the segment containing the DELETE,
371        # causing it to be compacted.
372        self.repository.delete(H(2))
373        self.repository.commit()
374
375        assert self.repository.io.segment_exists(put_segment)
376        assert not self.repository.io.segment_exists(delete_segment)
377
378        # Basic case, since the index survived this must be ok
379        assert H(1) not in self.repository
380        # Nuke index, force replay
381        os.unlink(os.path.join(self.repository.path, 'index.%d' % get_latest_segment()))
382        # Must not reappear
383        assert H(1) not in self.repository
384
385    def test_shadow_index_rollback(self):
386        self.repository.put(H(1), b'1')
387        self.repository.delete(H(1))
388        assert self.repository.shadow_index[H(1)] == [0]
389        self.repository.commit()
390        self.repo_dump('p1 d1 cc')
391        # note how an empty list means that nothing is shadowed for sure
392        assert self.repository.shadow_index[H(1)] == []  # because the delete is considered unstable
393        self.repository.put(H(1), b'1')
394        self.repository.delete(H(1))
395        self.repo_dump('p1 d1')
396        # 0 put/delete; 1 commit; 2 compacted; 3 commit; 4 put/delete
397        assert self.repository.shadow_index[H(1)] == [4]
398        self.repository.rollback()
399        self.repo_dump('r')
400        self.repository.put(H(2), b'1')
401        # After the rollback segment 4 shouldn't be considered anymore
402        assert self.repository.shadow_index[H(1)] == []  # because the delete is considered unstable
403
404
405class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
406    def open(self, create=False):
407        return Repository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True)
408
409    def test_destroy_append_only(self):
410        # Can't destroy append only repo (via the API)
411        with self.assert_raises(ValueError):
412            self.repository.destroy()
413        assert self.repository.append_only
414
415    def test_append_only(self):
416        def segments_in_repository():
417            return len(list(self.repository.io.segment_iterator()))
418        self.repository.put(H(0), b'foo')
419        self.repository.commit()
420
421        self.repository.append_only = False
422        assert segments_in_repository() == 2
423        self.repository.put(H(0), b'foo')
424        self.repository.commit()
425        # normal: compact squashes the data together, only one segment
426        assert segments_in_repository() == 4
427
428        self.repository.append_only = True
429        assert segments_in_repository() == 4
430        self.repository.put(H(0), b'foo')
431        self.repository.commit()
432        # append only: does not compact, only new segments written
433        assert segments_in_repository() == 6
434
435
436class RepositoryFreeSpaceTestCase(RepositoryTestCaseBase):
437    def test_additional_free_space(self):
438        self.add_keys()
439        self.repository.config.set('repository', 'additional_free_space', '1000T')
440        self.repository.save_key(b'shortcut to save_config')
441        self.reopen()
442
443        with self.repository:
444            self.repository.put(H(0), b'foobar')
445            with pytest.raises(Repository.InsufficientFreeSpaceError):
446                self.repository.commit()
447        assert os.path.exists(self.repository.path)
448
449    def test_create_free_space(self):
450        self.repository.additional_free_space = 1e20
451        with pytest.raises(Repository.InsufficientFreeSpaceError):
452            self.add_keys()
453        assert not os.path.exists(self.repository.path)
454
455
456class QuotaTestCase(RepositoryTestCaseBase):
457    def test_tracking(self):
458        assert self.repository.storage_quota_use == 0
459        self.repository.put(H(1), bytes(1234))
460        assert self.repository.storage_quota_use == 1234 + 41
461        self.repository.put(H(2), bytes(5678))
462        assert self.repository.storage_quota_use == 1234 + 5678 + 2 * 41
463        self.repository.delete(H(1))
464        assert self.repository.storage_quota_use == 5678 + 41
465        self.repository.commit()
466        self.reopen()
467        with self.repository:
468            # Open new transaction; hints and thus quota data is not loaded unless needed.
469            self.repository.put(H(3), b'')
470            self.repository.delete(H(3))
471            assert self.repository.storage_quota_use == 5678 + 41
472
473    def test_exceed_quota(self):
474        assert self.repository.storage_quota_use == 0
475        self.repository.storage_quota = 50
476        self.repository.put(H(1), b'')
477        assert self.repository.storage_quota_use == 41
478        self.repository.commit()
479        with pytest.raises(Repository.StorageQuotaExceeded):
480            self.repository.put(H(2), b'')
481        assert self.repository.storage_quota_use == 82
482        with pytest.raises(Repository.StorageQuotaExceeded):
483            self.repository.commit()
484        assert self.repository.storage_quota_use == 82
485        self.reopen()
486        with self.repository:
487            self.repository.storage_quota = 50
488            # Open new transaction; hints and thus quota data is not loaded unless needed.
489            self.repository.put(H(1), b'')
490            assert self.repository.storage_quota_use == 41
491
492
493class NonceReservation(RepositoryTestCaseBase):
494    def test_get_free_nonce_asserts(self):
495        self.reopen(exclusive=False)
496        with pytest.raises(AssertionError):
497            with self.repository:
498                self.repository.get_free_nonce()
499
500    def test_get_free_nonce(self):
501        with self.repository:
502            assert self.repository.get_free_nonce() is None
503
504            with open(os.path.join(self.repository.path, "nonce"), "w") as fd:
505                fd.write("0000000000000000")
506            assert self.repository.get_free_nonce() == 0
507
508            with open(os.path.join(self.repository.path, "nonce"), "w") as fd:
509                fd.write("5000000000000000")
510            assert self.repository.get_free_nonce() == 0x5000000000000000
511
512    def test_commit_nonce_reservation_asserts(self):
513        self.reopen(exclusive=False)
514        with pytest.raises(AssertionError):
515            with self.repository:
516                self.repository.commit_nonce_reservation(0x200, 0x100)
517
518    def test_commit_nonce_reservation(self):
519        with self.repository:
520            with pytest.raises(Exception):
521                self.repository.commit_nonce_reservation(0x200, 15)
522
523            self.repository.commit_nonce_reservation(0x200, None)
524            with open(os.path.join(self.repository.path, "nonce"), "r") as fd:
525                assert fd.read() == "0000000000000200"
526
527            with pytest.raises(Exception):
528                self.repository.commit_nonce_reservation(0x200, 15)
529
530            self.repository.commit_nonce_reservation(0x400, 0x200)
531            with open(os.path.join(self.repository.path, "nonce"), "r") as fd:
532                assert fd.read() == "0000000000000400"
533
534
535class RepositoryAuxiliaryCorruptionTestCase(RepositoryTestCaseBase):
536    def setUp(self):
537        super().setUp()
538        self.repository.put(H(0), b'foo')
539        self.repository.commit()
540        self.repository.close()
541
542    def do_commit(self):
543        with self.repository:
544            self.repository.put(H(0), b'fox')
545            self.repository.commit()
546
547    def test_corrupted_hints(self):
548        with open(os.path.join(self.repository.path, 'hints.1'), 'ab') as fd:
549            fd.write(b'123456789')
550        self.do_commit()
551
552    def test_deleted_hints(self):
553        os.unlink(os.path.join(self.repository.path, 'hints.1'))
554        self.do_commit()
555
556    def test_deleted_index(self):
557        os.unlink(os.path.join(self.repository.path, 'index.1'))
558        self.do_commit()
559
560    def test_unreadable_hints(self):
561        hints = os.path.join(self.repository.path, 'hints.1')
562        os.unlink(hints)
563        os.mkdir(hints)
564        with self.assert_raises(OSError):
565            self.do_commit()
566
567    def test_index(self):
568        with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd:
569            fd.write(b'123456789')
570        self.do_commit()
571
572    def test_index_outside_transaction(self):
573        with open(os.path.join(self.repository.path, 'index.1'), 'wb') as fd:
574            fd.write(b'123456789')
575        with self.repository:
576            assert len(self.repository) == 1
577
578    def _corrupt_index(self):
579        # HashIndex is able to detect incorrect headers and file lengths,
580        # but on its own it can't tell if the data is correct.
581        index_path = os.path.join(self.repository.path, 'index.1')
582        with open(index_path, 'r+b') as fd:
583            index_data = fd.read()
584            # Flip one bit in a key stored in the index
585            corrupted_key = (int.from_bytes(H(0), 'little') ^ 1).to_bytes(32, 'little')
586            corrupted_index_data = index_data.replace(H(0), corrupted_key)
587            assert corrupted_index_data != index_data
588            assert len(corrupted_index_data) == len(index_data)
589            fd.seek(0)
590            fd.write(corrupted_index_data)
591
592    def test_index_corrupted(self):
593        # HashIndex is able to detect incorrect headers and file lengths,
594        # but on its own it can't tell if the data itself is correct.
595        self._corrupt_index()
596        with self.repository:
597            # Data corruption is detected due to mismatching checksums
598            # and fixed by rebuilding the index.
599            assert len(self.repository) == 1
600            assert self.repository.get(H(0)) == b'foo'
601
602    def test_index_corrupted_without_integrity(self):
603        self._corrupt_index()
604        integrity_path = os.path.join(self.repository.path, 'integrity.1')
605        os.unlink(integrity_path)
606        with self.repository:
607            # Since the corrupted key is not noticed, the repository still thinks
608            # it contains one key...
609            assert len(self.repository) == 1
610            with pytest.raises(Repository.ObjectNotFound):
611                # ... but the real, uncorrupted key is not found in the corrupted index.
612                self.repository.get(H(0))
613
614    def test_unreadable_index(self):
615        index = os.path.join(self.repository.path, 'index.1')
616        os.unlink(index)
617        os.mkdir(index)
618        with self.assert_raises(OSError):
619            self.do_commit()
620
621    def test_unknown_integrity_version(self):
622        # For now an unknown integrity data version is ignored and not an error.
623        integrity_path = os.path.join(self.repository.path, 'integrity.1')
624        with open(integrity_path, 'r+b') as fd:
625            msgpack.pack({
626                # Borg only understands version 2
627                b'version': 4.7,
628            }, fd)
629            fd.truncate()
630        with self.repository:
631            # No issues accessing the repository
632            assert len(self.repository) == 1
633            assert self.repository.get(H(0)) == b'foo'
634
635    def _subtly_corrupted_hints_setup(self):
636        with self.repository:
637            self.repository.append_only = True
638            assert len(self.repository) == 1
639            assert self.repository.get(H(0)) == b'foo'
640            self.repository.put(H(1), b'bar')
641            self.repository.put(H(2), b'baz')
642            self.repository.commit()
643            self.repository.put(H(2), b'bazz')
644            self.repository.commit()
645
646        hints_path = os.path.join(self.repository.path, 'hints.5')
647        with open(hints_path, 'r+b') as fd:
648            hints = msgpack.unpack(fd)
649            fd.seek(0)
650            # Corrupt segment refcount
651            assert hints[b'segments'][2] == 1
652            hints[b'segments'][2] = 0
653            msgpack.pack(hints, fd)
654            fd.truncate()
655
656    def test_subtly_corrupted_hints(self):
657        self._subtly_corrupted_hints_setup()
658        with self.repository:
659            self.repository.append_only = False
660            self.repository.put(H(3), b'1234')
661            # Do a compaction run. Succeeds, since the failed checksum prompted a rebuild of the index+hints.
662            self.repository.commit()
663
664            assert len(self.repository) == 4
665            assert self.repository.get(H(0)) == b'foo'
666            assert self.repository.get(H(1)) == b'bar'
667            assert self.repository.get(H(2)) == b'bazz'
668
669    def test_subtly_corrupted_hints_without_integrity(self):
670        self._subtly_corrupted_hints_setup()
671        integrity_path = os.path.join(self.repository.path, 'integrity.5')
672        os.unlink(integrity_path)
673        with self.repository:
674            self.repository.append_only = False
675            self.repository.put(H(3), b'1234')
676            # Do a compaction run. Fails, since the corrupted refcount was not detected and leads to an assertion failure.
677            with pytest.raises(AssertionError) as exc_info:
678                self.repository.commit()
679            assert 'Corrupted segment reference count' in str(exc_info.value)
680
681
682class RepositoryCheckTestCase(RepositoryTestCaseBase):
683
684    def list_indices(self):
685        return [name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if name.startswith('index.')]
686
687    def check(self, repair=False, status=True):
688        self.assert_equal(self.repository.check(repair=repair), status)
689        # Make sure no tmp files are left behind
690        self.assert_equal([name for name in os.listdir(os.path.join(self.tmppath, 'repository')) if 'tmp' in name], [], 'Found tmp files')
691
692    def get_objects(self, *ids):
693        for id_ in ids:
694            self.repository.get(H(id_))
695
696    def add_objects(self, segments):
697        for ids in segments:
698            for id_ in ids:
699                self.repository.put(H(id_), b'data')
700            self.repository.commit()
701
702    def get_head(self):
703        return sorted(int(n) for n in os.listdir(os.path.join(self.tmppath, 'repository', 'data', '0')) if n.isdigit())[-1]
704
705    def open_index(self):
706        return NSIndex.read(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head())))
707
708    def corrupt_object(self, id_):
709        idx = self.open_index()
710        segment, offset = idx[H(id_)]
711        with open(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment)), 'r+b') as fd:
712            fd.seek(offset)
713            fd.write(b'BOOM')
714
715    def delete_segment(self, segment):
716        os.unlink(os.path.join(self.tmppath, 'repository', 'data', '0', str(segment)))
717
718    def delete_index(self):
719        os.unlink(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head())))
720
721    def rename_index(self, new_name):
722        os.rename(os.path.join(self.tmppath, 'repository', 'index.{}'.format(self.get_head())),
723                  os.path.join(self.tmppath, 'repository', new_name))
724
725    def list_objects(self):
726        return set(int(key) for key in self.repository.list())
727
728    def test_repair_corrupted_segment(self):
729        self.add_objects([[1, 2, 3], [4, 5], [6]])
730        self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())
731        self.check(status=True)
732        self.corrupt_object(5)
733        self.assert_raises(IntegrityError, lambda: self.get_objects(5))
734        self.repository.rollback()
735        # Make sure a regular check does not repair anything
736        self.check(status=False)
737        self.check(status=False)
738        # Make sure a repair actually repairs the repo
739        self.check(repair=True, status=True)
740        self.get_objects(4)
741        self.check(status=True)
742        self.assert_equal(set([1, 2, 3, 4, 6]), self.list_objects())
743
744    def test_repair_missing_segment(self):
745        self.add_objects([[1, 2, 3], [4, 5, 6]])
746        self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())
747        self.check(status=True)
748        self.delete_segment(2)
749        self.repository.rollback()
750        self.check(repair=True, status=True)
751        self.assert_equal(set([1, 2, 3]), self.list_objects())
752
753    def test_repair_missing_commit_segment(self):
754        self.add_objects([[1, 2, 3], [4, 5, 6]])
755        self.delete_segment(3)
756        self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4))
757        self.assert_equal(set([1, 2, 3]), self.list_objects())
758
759    def test_repair_corrupted_commit_segment(self):
760        self.add_objects([[1, 2, 3], [4, 5, 6]])
761        with open(os.path.join(self.tmppath, 'repository', 'data', '0', '3'), 'r+b') as fd:
762            fd.seek(-1, os.SEEK_END)
763            fd.write(b'X')
764        self.assert_raises(Repository.ObjectNotFound, lambda: self.get_objects(4))
765        self.check(status=True)
766        self.get_objects(3)
767        self.assert_equal(set([1, 2, 3]), self.list_objects())
768
769    def test_repair_no_commits(self):
770        self.add_objects([[1, 2, 3]])
771        with open(os.path.join(self.tmppath, 'repository', 'data', '0', '1'), 'r+b') as fd:
772            fd.seek(-1, os.SEEK_END)
773            fd.write(b'X')
774        self.assert_raises(Repository.CheckNeeded, lambda: self.get_objects(4))
775        self.check(status=False)
776        self.check(status=False)
777        self.assert_equal(self.list_indices(), ['index.1'])
778        self.check(repair=True, status=True)
779        self.assert_equal(self.list_indices(), ['index.3'])
780        self.check(status=True)
781        self.get_objects(3)
782        self.assert_equal(set([1, 2, 3]), self.list_objects())
783
784    def test_repair_missing_index(self):
785        self.add_objects([[1, 2, 3], [4, 5, 6]])
786        self.delete_index()
787        self.check(status=True)
788        self.get_objects(4)
789        self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())
790
791    def test_repair_index_too_new(self):
792        self.add_objects([[1, 2, 3], [4, 5, 6]])
793        self.assert_equal(self.list_indices(), ['index.3'])
794        self.rename_index('index.100')
795        self.check(status=True)
796        self.assert_equal(self.list_indices(), ['index.3'])
797        self.get_objects(4)
798        self.assert_equal(set([1, 2, 3, 4, 5, 6]), self.list_objects())
799
800    def test_crash_before_compact(self):
801        self.repository.put(H(0), b'data')
802        self.repository.put(H(0), b'data2')
803        # Simulate a crash before compact
804        with patch.object(Repository, 'compact_segments') as compact:
805            self.repository.commit()
806            compact.assert_called_once_with()
807        self.reopen()
808        with self.repository:
809            self.check(repair=True)
810            self.assert_equal(self.repository.get(H(0)), b'data2')
811
812
813class RepositoryHintsTestCase(RepositoryTestCaseBase):
814
815    def test_hints_behaviour(self):
816        self.repository.put(H(0), b'data')
817        self.assert_equal(self.repository.shadow_index, {})
818        self.assert_true(len(self.repository.compact) == 0)
819        self.repository.delete(H(0))
820        self.repository.commit()
821        # this is to make the previous delete "stable", see delete_is_not_stable:
822        self.repository.put(H(1), b'data')
823        self.repository.commit()
824        # in 1.1-maint, commit() always implicitly compacts.
825        # nothing to compact any more! no info left about stuff that does not exist any more:
826        self.assert_not_in(H(0), self.repository.shadow_index)
827        # segment 0 was compacted away, no info about it left:
828        self.assert_not_in(0, self.repository.compact)
829        self.assert_not_in(0, self.repository.segments)
830
831
832class RemoteRepositoryTestCase(RepositoryTestCase):
833    repository = None  # type: RemoteRepository
834
835    def open(self, create=False):
836        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
837                                exclusive=True, create=create)
838
839    def _get_mock_args(self):
840        class MockArgs:
841            remote_path = 'borg'
842            umask = 0o077
843            debug_topics = []
844            rsh = None
845
846            def __contains__(self, item):
847                # To behave like argparse.Namespace
848                return hasattr(self, item)
849
850        return MockArgs()
851
852    def test_invalid_rpc(self):
853        self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', {}))
854
855    def test_rpc_exception_transport(self):
856        s1 = 'test string'
857
858        try:
859            self.repository.call('inject_exception', {'kind': 'DoesNotExist'})
860        except Repository.DoesNotExist as e:
861            assert len(e.args) == 1
862            assert e.args[0] == self.repository.location.orig
863
864        try:
865            self.repository.call('inject_exception', {'kind': 'AlreadyExists'})
866        except Repository.AlreadyExists as e:
867            assert len(e.args) == 1
868            assert e.args[0] == self.repository.location.orig
869
870        try:
871            self.repository.call('inject_exception', {'kind': 'CheckNeeded'})
872        except Repository.CheckNeeded as e:
873            assert len(e.args) == 1
874            assert e.args[0] == self.repository.location.orig
875
876        try:
877            self.repository.call('inject_exception', {'kind': 'IntegrityError'})
878        except IntegrityError as e:
879            assert len(e.args) == 1
880            assert e.args[0] == s1
881
882        try:
883            self.repository.call('inject_exception', {'kind': 'PathNotAllowed'})
884        except PathNotAllowed as e:
885            assert len(e.args) == 1
886            assert e.args[0] == 'foo'
887
888        try:
889            self.repository.call('inject_exception', {'kind': 'ObjectNotFound'})
890        except Repository.ObjectNotFound as e:
891            assert len(e.args) == 2
892            assert e.args[0] == s1
893            assert e.args[1] == self.repository.location.orig
894
895        try:
896            self.repository.call('inject_exception', {'kind': 'InvalidRPCMethod'})
897        except InvalidRPCMethod as e:
898            assert len(e.args) == 1
899            assert e.args[0] == s1
900
901        try:
902            self.repository.call('inject_exception', {'kind': 'divide'})
903        except RemoteRepository.RPCError as e:
904            assert e.unpacked
905            assert e.get_message() == 'ZeroDivisionError: integer division or modulo by zero\n'
906            assert e.exception_class == 'ZeroDivisionError'
907            assert len(e.exception_full) > 0
908
909    def test_ssh_cmd(self):
910        args = self._get_mock_args()
911        self.repository._args = args
912        assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', 'example.com']
913        assert self.repository.ssh_cmd(Location('ssh://example.com/foo')) == ['ssh', 'example.com']
914        assert self.repository.ssh_cmd(Location('ssh://user@example.com/foo')) == ['ssh', 'user@example.com']
915        assert self.repository.ssh_cmd(Location('ssh://user@example.com:1234/foo')) == ['ssh', '-p', '1234', 'user@example.com']
916        os.environ['BORG_RSH'] = 'ssh --foo'
917        assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '--foo', 'example.com']
918
919    def test_borg_cmd(self):
920        assert self.repository.borg_cmd(None, testing=True) == [sys.executable, '-m', 'borg.archiver', 'serve']
921        args = self._get_mock_args()
922        # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
923        logging.getLogger().setLevel(logging.INFO)
924        # note: test logger is on info log level, so --info gets added automagically
925        assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info']
926        args.remote_path = 'borg-0.28.2'
927        assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info']
928        args.debug_topics = ['something_client_side', 'repository_compaction']
929        assert self.repository.borg_cmd(args, testing=False) == ['borg-0.28.2', 'serve', '--umask=077', '--info',
930                                                                 '--debug-topic=borg.debug.repository_compaction']
931        args = self._get_mock_args()
932        args.storage_quota = 0
933        assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info']
934        args.storage_quota = 314159265
935        assert self.repository.borg_cmd(args, testing=False) == ['borg', 'serve', '--umask=077', '--info',
936                                                                 '--storage-quota=314159265']
937        args.rsh = 'ssh -i foo'
938        self.repository._args = args
939        assert self.repository.ssh_cmd(Location('example.com:foo')) == ['ssh', '-i', 'foo', 'example.com']
940
941
942class RemoteLegacyFree(RepositoryTestCaseBase):
943    # Keep testing this so we can someday safely remove the legacy tuple format.
944
945    def open(self, create=False):
946        with patch.object(RemoteRepository, 'dictFormat', True):
947            return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
948                                    exclusive=True, create=create)
949
950    def test_legacy_free(self):
951        # put
952        self.repository.put(H(0), b'foo')
953        self.repository.commit()
954        self.repository.close()
955        # replace
956        self.repository = self.open()
957        with self.repository:
958            self.repository.put(H(0), b'bar')
959            self.repository.commit()
960        # delete
961        self.repository = self.open()
962        with self.repository:
963            self.repository.delete(H(0))
964            self.repository.commit()
965
966
967class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
968
969    def open(self, create=False):
970        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
971                                exclusive=True, create=create)
972
973    def test_crash_before_compact(self):
974        # skip this test, we can't mock-patch a Repository class in another process!
975        pass
976
977
978class RemoteLoggerTestCase(BaseTestCase):
979    def setUp(self):
980        self.stream = io.StringIO()
981        self.handler = logging.StreamHandler(self.stream)
982        logging.getLogger().handlers[:] = [self.handler]
983        logging.getLogger('borg.repository').handlers[:] = []
984        logging.getLogger('borg.repository.foo').handlers[:] = []
985        # capture stderr
986        sys.stderr.flush()
987        self.old_stderr = sys.stderr
988        self.stderr = sys.stderr = io.StringIO()
989
990    def tearDown(self):
991        sys.stderr = self.old_stderr
992
993    def test_stderr_messages(self):
994        handle_remote_line("unstructured stderr message\n")
995        self.assert_equal(self.stream.getvalue(), '')
996        # stderr messages don't get an implicit newline
997        self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr message\n')
998
999    def test_stderr_progress_messages(self):
1000        handle_remote_line("unstructured stderr progress message\r")
1001        self.assert_equal(self.stream.getvalue(), '')
1002        # stderr messages don't get an implicit newline
1003        self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr progress message\r')
1004
1005    def test_pre11_format_messages(self):
1006        self.handler.setLevel(logging.DEBUG)
1007        logging.getLogger().setLevel(logging.DEBUG)
1008
1009        handle_remote_line("$LOG INFO Remote: borg < 1.1 format message\n")
1010        self.assert_equal(self.stream.getvalue(), 'Remote: borg < 1.1 format message\n')
1011        self.assert_equal(self.stderr.getvalue(), '')
1012
1013    def test_post11_format_messages(self):
1014        self.handler.setLevel(logging.DEBUG)
1015        logging.getLogger().setLevel(logging.DEBUG)
1016
1017        handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message\n")
1018        self.assert_equal(self.stream.getvalue(), 'Remote: borg >= 1.1 format message\n')
1019        self.assert_equal(self.stderr.getvalue(), '')
1020
1021    def test_remote_messages_screened(self):
1022        # default borg config for root logger
1023        self.handler.setLevel(logging.WARNING)
1024        logging.getLogger().setLevel(logging.WARNING)
1025
1026        handle_remote_line("$LOG INFO borg.repository Remote: new format info message\n")
1027        self.assert_equal(self.stream.getvalue(), '')
1028        self.assert_equal(self.stderr.getvalue(), '')
1029
1030    def test_info_to_correct_local_child(self):
1031        logging.getLogger('borg.repository').setLevel(logging.INFO)
1032        logging.getLogger('borg.repository.foo').setLevel(logging.INFO)
1033        # default borg config for root logger
1034        self.handler.setLevel(logging.WARNING)
1035        logging.getLogger().setLevel(logging.WARNING)
1036
1037        child_stream = io.StringIO()
1038        child_handler = logging.StreamHandler(child_stream)
1039        child_handler.setLevel(logging.INFO)
1040        logging.getLogger('borg.repository').handlers[:] = [child_handler]
1041        foo_stream = io.StringIO()
1042        foo_handler = logging.StreamHandler(foo_stream)
1043        foo_handler.setLevel(logging.INFO)
1044        logging.getLogger('borg.repository.foo').handlers[:] = [foo_handler]
1045
1046        handle_remote_line("$LOG INFO borg.repository Remote: new format child message\n")
1047        self.assert_equal(foo_stream.getvalue(), '')
1048        self.assert_equal(child_stream.getvalue(), 'Remote: new format child message\n')
1049        self.assert_equal(self.stream.getvalue(), '')
1050        self.assert_equal(self.stderr.getvalue(), '')
1051