1import getpass
2import os.path
3import re
4import tempfile
5from binascii import hexlify, unhexlify
6
7import pytest
8
9from ..crypto.key import Passphrase, PasswordRetriesExceeded, bin_to_hex
10from ..crypto.key import PlaintextKey, PassphraseKey, AuthenticatedKey, RepoKey, KeyfileKey, \
11    Blake2KeyfileKey, Blake2RepoKey, Blake2AuthenticatedKey
12from ..crypto.key import ID_HMAC_SHA_256, ID_BLAKE2b_256
13from ..crypto.key import TAMRequiredError, TAMInvalid, TAMUnsupportedSuiteError, UnsupportedManifestError
14from ..crypto.key import identify_key
15from ..crypto.low_level import bytes_to_long, num_aes_blocks
16from ..helpers import IntegrityError
17from ..helpers import Location
18from ..helpers import StableDict
19from ..helpers import get_security_dir
20from ..helpers import msgpack
21
22
23class TestKey:
24    class MockArgs:
25        location = Location(tempfile.mkstemp()[1])
26
27    keyfile2_key_file = """
28        BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000
29        hqppdGVyYXRpb25zzgABhqCkaGFzaNoAIMyonNI+7Cjv0qHi0AOBM6bLGxACJhfgzVD2oq
30        bIS9SFqWFsZ29yaXRobaZzaGEyNTakc2FsdNoAINNK5qqJc1JWSUjACwFEWGTdM7Nd0a5l
31        1uBGPEb+9XM9p3ZlcnNpb24BpGRhdGHaANAYDT5yfPpU099oBJwMomsxouKyx/OG4QIXK2
32        hQCG2L2L/9PUu4WIuKvGrsXoP7syemujNfcZws5jLp2UPva4PkQhQsrF1RYDEMLh2eF9Ol
33        rwtkThq1tnh7KjWMG9Ijt7/aoQtq0zDYP/xaFF8XXSJxiyP5zjH5+spB6RL0oQHvbsliSh
34        /cXJq7jrqmrJ1phd6dg4SHAM/i+hubadZoS6m25OQzYAW09wZD/phG8OVa698Z5ed3HTaT
35        SmrtgJL3EoOKgUI9d6BLE4dJdBqntifo""".strip()
36
37    keyfile2_cdata = unhexlify(re.sub(r'\W', '', """
38        0055f161493fcfc16276e8c31493c4641e1eb19a79d0326fad0291e5a9c98e5933
39        00000000000003e8d21eaf9b86c297a8cd56432e1915bb
40        """))
41    keyfile2_id = unhexlify('c3fbf14bc001ebcc3cd86e696c13482ed071740927cd7cbe1b01b4bfcee49314')
42
43    keyfile_blake2_key_file = """
44        BORG_KEY 0000000000000000000000000000000000000000000000000000000000000000
45        hqlhbGdvcml0aG2mc2hhMjU2pGRhdGHaAZBu680Do3CmfWzeMCwe48KJi3Vps9mEDy7MKF
46        TastsEhiAd1RQMuxfZpklkLeddMMWk+aPtFiURRFb02JLXV5cKRC1o2ZDdiNa0nao+o6+i
47        gUjjsea9TAu25t3vxh8uQWs5BuKRLBRr0nUgrSd0IYMUgn+iVbLJRzCCssvxsklkwQxN3F
48        Y+MvBnn8kUXSeoSoQ2l0fBHzq94Y7LMOm/owMam5URnE8/UEc6ZXBrbyX4EXxDtUqJcs+D
49        i451thtlGdigDLpvf9nyK66mjiCpPCTCgtlzq0Pe1jcdhnsUYLg+qWzXZ7e2opEZoC6XxS
50        3DIuBOxG3Odqj9IKB+6/kl94vz98awPWFSpYcLZVWu7sIP38ZkUK+ad5MHTo/LvTuZdFnd
51        iqKzZIDUJl3Zl1WGmP/0xVOmfIlznkCZy4d3SMuujwIcqQ5kDvwDRPpdhBBk+UWQY5vFXk
52        kR1NBNLSTyhAzu3fiUmFl0qZ+UWPRkGAEBy/NuoEibrWwab8BX97cATyvnmOqYkU9PT0C6
53        l2l9E4bPpGhhc2jaACDnIa8KgKv84/b5sjaMgSZeIVkuKSLJy2NN8zoH8lnd36ppdGVyYX
54        Rpb25zzgABhqCkc2FsdNoAIEJLlLh7q74j3q53856H5GgzA1HH+aW5bA/as544+PGkp3Zl
55        cnNpb24B""".strip()
56
57    keyfile_blake2_cdata = bytes.fromhex('04fdf9475cf2323c0ba7a99ddc011064f2e7d039f539f2e448'
58                                         '0e6f5fc6ff9993d604040404040404098c8cee1c6db8c28947')
59    # Verified against b2sum. Entire string passed to BLAKE2, including the padded 64 byte key contained in
60    # keyfile_blake2_key_file above is
61    # 19280471de95185ec27ecb6fc9edbb4f4db26974c315ede1cd505fab4250ce7cd0d081ea66946c
62    # 95f0db934d5f616921efbd869257e8ded2bd9bd93d7f07b1a30000000000000000000000000000
63    # 000000000000000000000000000000000000000000000000000000000000000000000000000000
64    # 00000000000000000000007061796c6f6164
65    #                       p a y l o a d
66    keyfile_blake2_id = bytes.fromhex('d8bc68e961c79f99be39061589e5179b2113cd9226e07b08ddd4a1fef7ce93fb')
67
68    @pytest.fixture
69    def keys_dir(self, request, monkeypatch, tmpdir):
70        monkeypatch.setenv('BORG_KEYS_DIR', str(tmpdir))
71        return tmpdir
72
73    @pytest.fixture(params=(
74        PlaintextKey,
75        AuthenticatedKey,
76        KeyfileKey,
77        RepoKey,
78        Blake2KeyfileKey,
79        Blake2RepoKey,
80        Blake2AuthenticatedKey,
81    ))
82    def key(self, request, monkeypatch):
83        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
84        return request.param.create(self.MockRepository(), self.MockArgs())
85
86    class MockRepository:
87        class _Location:
88            orig = '/some/place'
89
90            def canonical_path(self):
91                return self.orig
92
93        _location = _Location()
94        id = bytes(32)
95        id_str = bin_to_hex(id)
96
97        def get_free_nonce(self):
98            return None
99
100        def commit_nonce_reservation(self, next_unreserved, start_nonce):
101            pass
102
103        def save_key(self, data):
104            self.key_data = data
105
106        def load_key(self):
107            return self.key_data
108
109    def test_plaintext(self):
110        key = PlaintextKey.create(None, None)
111        chunk = b'foo'
112        assert hexlify(key.id_hash(chunk)) == b'2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae'
113        assert chunk == key.decrypt(key.id_hash(chunk), key.encrypt(chunk))
114
115    def test_keyfile(self, monkeypatch, keys_dir):
116        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
117        key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
118        assert bytes_to_long(key.enc_cipher.iv, 8) == 0
119        manifest = key.encrypt(b'ABC')
120        assert key.extract_nonce(manifest) == 0
121        manifest2 = key.encrypt(b'ABC')
122        assert manifest != manifest2
123        assert key.decrypt(None, manifest) == key.decrypt(None, manifest2)
124        assert key.extract_nonce(manifest2) == 1
125        iv = key.extract_nonce(manifest)
126        key2 = KeyfileKey.detect(self.MockRepository(), manifest)
127        assert bytes_to_long(key2.enc_cipher.iv, 8) >= iv + num_aes_blocks(len(manifest) - KeyfileKey.PAYLOAD_OVERHEAD)
128        # Key data sanity check
129        assert len({key2.id_key, key2.enc_key, key2.enc_hmac_key}) == 3
130        assert key2.chunk_seed != 0
131        chunk = b'foo'
132        assert chunk == key2.decrypt(key.id_hash(chunk), key.encrypt(chunk))
133
134    def test_keyfile_nonce_rollback_protection(self, monkeypatch, keys_dir):
135        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
136        repository = self.MockRepository()
137        with open(os.path.join(get_security_dir(repository.id_str), 'nonce'), "w") as fd:
138            fd.write("0000000000002000")
139        key = KeyfileKey.create(repository, self.MockArgs())
140        data = key.encrypt(b'ABC')
141        assert key.extract_nonce(data) == 0x2000
142        assert key.decrypt(None, data) == b'ABC'
143
144    def test_keyfile_kfenv(self, tmpdir, monkeypatch):
145        keyfile = tmpdir.join('keyfile')
146        monkeypatch.setenv('BORG_KEY_FILE', str(keyfile))
147        monkeypatch.setenv('BORG_PASSPHRASE', 'testkf')
148        assert not keyfile.exists()
149        key = KeyfileKey.create(self.MockRepository(), self.MockArgs())
150        assert keyfile.exists()
151        chunk = b'ABC'
152        chunk_id = key.id_hash(chunk)
153        chunk_cdata = key.encrypt(chunk)
154        key = KeyfileKey.detect(self.MockRepository(), chunk_cdata)
155        assert chunk == key.decrypt(chunk_id, chunk_cdata)
156        keyfile.remove()
157        with pytest.raises(FileNotFoundError):
158            KeyfileKey.detect(self.MockRepository(), chunk_cdata)
159
160    def test_keyfile2(self, monkeypatch, keys_dir):
161        with keys_dir.join('keyfile').open('w') as fd:
162            fd.write(self.keyfile2_key_file)
163        monkeypatch.setenv('BORG_PASSPHRASE', 'passphrase')
164        key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
165        assert key.decrypt(self.keyfile2_id, self.keyfile2_cdata) == b'payload'
166
167    def test_keyfile2_kfenv(self, tmpdir, monkeypatch):
168        keyfile = tmpdir.join('keyfile')
169        with keyfile.open('w') as fd:
170            fd.write(self.keyfile2_key_file)
171        monkeypatch.setenv('BORG_KEY_FILE', str(keyfile))
172        monkeypatch.setenv('BORG_PASSPHRASE', 'passphrase')
173        key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
174        assert key.decrypt(self.keyfile2_id, self.keyfile2_cdata) == b'payload'
175
176    def test_keyfile_blake2(self, monkeypatch, keys_dir):
177        with keys_dir.join('keyfile').open('w') as fd:
178            fd.write(self.keyfile_blake2_key_file)
179        monkeypatch.setenv('BORG_PASSPHRASE', 'passphrase')
180        key = Blake2KeyfileKey.detect(self.MockRepository(), self.keyfile_blake2_cdata)
181        assert key.decrypt(self.keyfile_blake2_id, self.keyfile_blake2_cdata) == b'payload'
182
183    def test_passphrase(self, keys_dir, monkeypatch):
184        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
185        key = PassphraseKey.create(self.MockRepository(), None)
186        assert bytes_to_long(key.enc_cipher.iv, 8) == 0
187        assert hexlify(key.id_key) == b'793b0717f9d8fb01c751a487e9b827897ceea62409870600013fbc6b4d8d7ca6'
188        assert hexlify(key.enc_hmac_key) == b'b885a05d329a086627412a6142aaeb9f6c54ab7950f996dd65587251f6bc0901'
189        assert hexlify(key.enc_key) == b'2ff3654c6daf7381dbbe718d2b20b4f1ea1e34caa6cc65f6bb3ac376b93fed2a'
190        assert key.chunk_seed == -775740477
191        manifest = key.encrypt(b'ABC')
192        assert key.extract_nonce(manifest) == 0
193        manifest2 = key.encrypt(b'ABC')
194        assert manifest != manifest2
195        assert key.decrypt(None, manifest) == key.decrypt(None, manifest2)
196        assert key.extract_nonce(manifest2) == 1
197        iv = key.extract_nonce(manifest)
198        key2 = PassphraseKey.detect(self.MockRepository(), manifest)
199        assert bytes_to_long(key2.enc_cipher.iv, 8) == iv + num_aes_blocks(len(manifest) - PassphraseKey.PAYLOAD_OVERHEAD)
200        assert key.id_key == key2.id_key
201        assert key.enc_hmac_key == key2.enc_hmac_key
202        assert key.enc_key == key2.enc_key
203        assert key.chunk_seed == key2.chunk_seed
204        chunk = b'foo'
205        assert hexlify(key.id_hash(chunk)) == b'818217cf07d37efad3860766dcdf1d21e401650fed2d76ed1d797d3aae925990'
206        assert chunk == key2.decrypt(key2.id_hash(chunk), key.encrypt(chunk))
207
208    def _corrupt_byte(self, key, data, offset):
209        data = bytearray(data)
210        data[offset] ^= 1
211        with pytest.raises(IntegrityError):
212            key.decrypt(b'', data)
213
214    def test_decrypt_integrity(self, monkeypatch, keys_dir):
215        with keys_dir.join('keyfile').open('w') as fd:
216            fd.write(self.keyfile2_key_file)
217        monkeypatch.setenv('BORG_PASSPHRASE', 'passphrase')
218        key = KeyfileKey.detect(self.MockRepository(), self.keyfile2_cdata)
219
220        data = self.keyfile2_cdata
221        for i in range(len(data)):
222            self._corrupt_byte(key, data, i)
223
224        with pytest.raises(IntegrityError):
225            data = bytearray(self.keyfile2_cdata)
226            id = bytearray(key.id_hash(data))  # corrupt chunk id
227            id[12] = 0
228            key.decrypt(id, data)
229
230    def test_roundtrip(self, key):
231        repository = key.repository
232        plaintext = b'foo'
233        encrypted = key.encrypt(plaintext)
234        identified_key_class = identify_key(encrypted)
235        assert identified_key_class == key.__class__
236        loaded_key = identified_key_class.detect(repository, encrypted)
237        decrypted = loaded_key.decrypt(None, encrypted)
238        assert decrypted == plaintext
239
240    def test_decrypt_decompress(self, key):
241        plaintext = b'123456789'
242        encrypted = key.encrypt(plaintext)
243        assert key.decrypt(None, encrypted, decompress=False) != plaintext
244        assert key.decrypt(None, encrypted) == plaintext
245
246    def test_assert_id(self, key):
247        plaintext = b'123456789'
248        id = key.id_hash(plaintext)
249        key.assert_id(id, plaintext)
250        id_changed = bytearray(id)
251        id_changed[0] ^= 1
252        with pytest.raises(IntegrityError):
253            key.assert_id(id_changed, plaintext)
254        plaintext_changed = plaintext + b'1'
255        with pytest.raises(IntegrityError):
256            key.assert_id(id, plaintext_changed)
257
258    def test_authenticated_encrypt(self, monkeypatch):
259        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
260        key = AuthenticatedKey.create(self.MockRepository(), self.MockArgs())
261        assert AuthenticatedKey.id_hash is ID_HMAC_SHA_256.id_hash
262        assert len(key.id_key) == 32
263        plaintext = b'123456789'
264        authenticated = key.encrypt(plaintext)
265        # 0x07 is the key TYPE, 0x0100 identifies LZ4 compression, 0x90 is part of LZ4 and means that an uncompressed
266        # block of length nine follows (the plaintext).
267        assert authenticated == b'\x07\x01\x00\x90' + plaintext
268
269    def test_blake2_authenticated_encrypt(self, monkeypatch):
270        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
271        key = Blake2AuthenticatedKey.create(self.MockRepository(), self.MockArgs())
272        assert Blake2AuthenticatedKey.id_hash is ID_BLAKE2b_256.id_hash
273        assert len(key.id_key) == 128
274        plaintext = b'123456789'
275        authenticated = key.encrypt(plaintext)
276        # 0x06 is the key TYPE, 0x0100 identifies LZ4 compression, 0x90 is part of LZ4 and means that an uncompressed
277        # block of length nine follows (the plaintext).
278        assert authenticated == b'\x06\x01\x00\x90' + plaintext
279
280
281class TestPassphrase:
282    def test_passphrase_new_verification(self, capsys, monkeypatch):
283        monkeypatch.setattr(getpass, 'getpass', lambda prompt: "12aöäü")
284        monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'no')
285        Passphrase.new()
286        out, err = capsys.readouterr()
287        assert "12" not in out
288        assert "12" not in err
289
290        monkeypatch.setenv('BORG_DISPLAY_PASSPHRASE', 'yes')
291        passphrase = Passphrase.new()
292        out, err = capsys.readouterr()
293        assert "313261c3b6c3a4c3bc" not in out
294        assert "313261c3b6c3a4c3bc" in err
295        assert passphrase == "12aöäü"
296
297        monkeypatch.setattr(getpass, 'getpass', lambda prompt: "1234/@=")
298        Passphrase.new()
299        out, err = capsys.readouterr()
300        assert "1234/@=" not in out
301        assert "1234/@=" in err
302
303    def test_passphrase_new_empty(self, capsys, monkeypatch):
304        monkeypatch.delenv('BORG_PASSPHRASE', False)
305        monkeypatch.setattr(getpass, 'getpass', lambda prompt: "")
306        with pytest.raises(PasswordRetriesExceeded):
307            Passphrase.new(allow_empty=False)
308        out, err = capsys.readouterr()
309        assert "must not be blank" in err
310
311    def test_passphrase_new_retries(self, monkeypatch):
312        monkeypatch.delenv('BORG_PASSPHRASE', False)
313        ascending_numbers = iter(range(20))
314        monkeypatch.setattr(getpass, 'getpass', lambda prompt: str(next(ascending_numbers)))
315        with pytest.raises(PasswordRetriesExceeded):
316            Passphrase.new()
317
318    def test_passphrase_repr(self):
319        assert "secret" not in repr(Passphrase("secret"))
320
321
322class TestTAM:
323    @pytest.fixture
324    def key(self, monkeypatch):
325        monkeypatch.setenv('BORG_PASSPHRASE', 'test')
326        return KeyfileKey.create(TestKey.MockRepository(), TestKey.MockArgs())
327
328    def test_unpack_future(self, key):
329        blob = b'\xc1\xc1\xc1\xc1foobar'
330        with pytest.raises(UnsupportedManifestError):
331            key.unpack_and_verify_manifest(blob)
332
333        blob = b'\xc1\xc1\xc1'
334        with pytest.raises((ValueError, msgpack.UnpackException)):
335            key.unpack_and_verify_manifest(blob)
336
337    def test_missing_when_required(self, key):
338        blob = msgpack.packb({})
339        with pytest.raises(TAMRequiredError):
340            key.unpack_and_verify_manifest(blob)
341
342    def test_missing(self, key):
343        blob = msgpack.packb({})
344        key.tam_required = False
345        unpacked, verified = key.unpack_and_verify_manifest(blob)
346        assert unpacked == {}
347        assert not verified
348
349    def test_unknown_type_when_required(self, key):
350        blob = msgpack.packb({
351            'tam': {
352                'type': 'HMAC_VOLLBIT',
353            },
354        })
355        with pytest.raises(TAMUnsupportedSuiteError):
356            key.unpack_and_verify_manifest(blob)
357
358    def test_unknown_type(self, key):
359        blob = msgpack.packb({
360            'tam': {
361                'type': 'HMAC_VOLLBIT',
362            },
363        })
364        key.tam_required = False
365        unpacked, verified = key.unpack_and_verify_manifest(blob)
366        assert unpacked == {}
367        assert not verified
368
369    @pytest.mark.parametrize('tam, exc', (
370        ({}, TAMUnsupportedSuiteError),
371        ({'type': b'\xff'}, TAMUnsupportedSuiteError),
372        (None, TAMInvalid),
373        (1234, TAMInvalid),
374    ))
375    def test_invalid(self, key, tam, exc):
376        blob = msgpack.packb({
377            'tam': tam,
378        })
379        with pytest.raises(exc):
380            key.unpack_and_verify_manifest(blob)
381
382    @pytest.mark.parametrize('hmac, salt', (
383        ({}, bytes(64)),
384        (bytes(64), {}),
385        (None, bytes(64)),
386        (bytes(64), None),
387    ))
388    def test_wrong_types(self, key, hmac, salt):
389        data = {
390            'tam': {
391                'type': 'HKDF_HMAC_SHA512',
392                'hmac': hmac,
393                'salt': salt
394            },
395        }
396        tam = data['tam']
397        if hmac is None:
398            del tam['hmac']
399        if salt is None:
400            del tam['salt']
401        blob = msgpack.packb(data)
402        with pytest.raises(TAMInvalid):
403            key.unpack_and_verify_manifest(blob)
404
405    def test_round_trip(self, key):
406        data = {'foo': 'bar'}
407        blob = key.pack_and_authenticate_metadata(data)
408        assert blob.startswith(b'\x82')
409
410        unpacked = msgpack.unpackb(blob)
411        assert unpacked[b'tam'][b'type'] == b'HKDF_HMAC_SHA512'
412
413        unpacked, verified = key.unpack_and_verify_manifest(blob)
414        assert verified
415        assert unpacked[b'foo'] == b'bar'
416        assert b'tam' not in unpacked
417
418    @pytest.mark.parametrize('which', (b'hmac', b'salt'))
419    def test_tampered(self, key, which):
420        data = {'foo': 'bar'}
421        blob = key.pack_and_authenticate_metadata(data)
422        assert blob.startswith(b'\x82')
423
424        unpacked = msgpack.unpackb(blob, object_hook=StableDict)
425        assert len(unpacked[b'tam'][which]) == 64
426        unpacked[b'tam'][which] = unpacked[b'tam'][which][0:32] + bytes(32)
427        assert len(unpacked[b'tam'][which]) == 64
428        blob = msgpack.packb(unpacked)
429
430        with pytest.raises(TAMInvalid):
431            key.unpack_and_verify_manifest(blob)
432