1import os.path
2
3import pytest
4
5from ..crypto import nonces
6from ..crypto.nonces import NonceManager
7from ..crypto.key import bin_to_hex
8from ..helpers import get_security_dir
9from ..remote import InvalidRPCMethod
10
11
12class TestNonceManager:
13
14    class MockRepository:
15        class _Location:
16            orig = '/some/place'
17
18        _location = _Location()
19        id = bytes(32)
20        id_str = bin_to_hex(id)
21
22        def get_free_nonce(self):
23            return self.next_free
24
25        def commit_nonce_reservation(self, next_unreserved, start_nonce):
26            assert start_nonce == self.next_free
27            self.next_free = next_unreserved
28
29    class MockOldRepository(MockRepository):
30        def get_free_nonce(self):
31            raise InvalidRPCMethod("")
32
33        def commit_nonce_reservation(self, next_unreserved, start_nonce):
34            pytest.fail("commit_nonce_reservation should never be called on an old repository")
35
36    class MockEncCipher:
37        def __init__(self, iv):
38            self.iv_set = False  # placeholder, this is never a valid iv
39            self.iv = iv
40
41        def reset(self, key, iv):
42            assert key is None
43            assert iv is not False
44            self.iv_set = iv
45            self.iv = iv
46
47        def expect_iv_and_advance(self, expected_iv, advance):
48            expected_iv = expected_iv.to_bytes(16, byteorder='big')
49            iv_set = self.iv_set
50            assert iv_set == expected_iv
51            self.iv_set = False
52            self.iv = advance.to_bytes(16, byteorder='big')
53
54        def expect_no_reset_and_advance(self, advance):
55            iv_set = self.iv_set
56            assert iv_set is False
57            self.iv = advance.to_bytes(16, byteorder='big')
58
59    def setUp(self):
60        self.repository = None
61
62    def cache_nonce(self):
63        with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "r") as fd:
64            return fd.read()
65
66    def set_cache_nonce(self, nonce):
67        with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "w") as fd:
68            assert fd.write(nonce)
69
70    def test_empty_cache_and_old_server(self, monkeypatch):
71        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
72
73        enc_cipher = self.MockEncCipher(0x2000)
74        self.repository = self.MockOldRepository()
75        manager = NonceManager(self.repository, enc_cipher, 0x2000)
76        manager.ensure_reservation(19)
77        enc_cipher.expect_iv_and_advance(0x2000, 0x2013)
78
79        assert self.cache_nonce() == "0000000000002033"
80
81    def test_empty_cache(self, monkeypatch):
82        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
83
84        enc_cipher = self.MockEncCipher(0x2000)
85        self.repository = self.MockRepository()
86        self.repository.next_free = 0x2000
87        manager = NonceManager(self.repository, enc_cipher, 0x2000)
88        manager.ensure_reservation(19)
89        enc_cipher.expect_iv_and_advance(0x2000, 0x2013)
90
91        assert self.cache_nonce() == "0000000000002033"
92
93    def test_empty_nonce(self, monkeypatch):
94        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
95
96        enc_cipher = self.MockEncCipher(0x2000)
97        self.repository = self.MockRepository()
98        self.repository.next_free = None
99        manager = NonceManager(self.repository, enc_cipher, 0x2000)
100        manager.ensure_reservation(19)
101        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
102
103        assert self.cache_nonce() == "0000000000002033"
104        assert self.repository.next_free == 0x2033
105
106        # enough space in reservation
107        manager.ensure_reservation(13)
108        enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13)
109        assert self.cache_nonce() == "0000000000002033"
110        assert self.repository.next_free == 0x2033
111
112        # just barely enough space in reservation
113        manager.ensure_reservation(19)
114        enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19)
115        assert self.cache_nonce() == "0000000000002033"
116        assert self.repository.next_free == 0x2033
117
118        # no space in reservation
119        manager.ensure_reservation(16)
120        enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16)
121        assert self.cache_nonce() == "0000000000002063"
122        assert self.repository.next_free == 0x2063
123
124        # spans reservation boundary
125        manager.ensure_reservation(64)
126        enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16 + 64)
127        assert self.cache_nonce() == "00000000000020c3"
128        assert self.repository.next_free == 0x20c3
129
130    def test_sync_nonce(self, monkeypatch):
131        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
132
133        enc_cipher = self.MockEncCipher(0x2000)
134        self.repository = self.MockRepository()
135        self.repository.next_free = 0x2000
136        self.set_cache_nonce("0000000000002000")
137
138        manager = NonceManager(self.repository, enc_cipher, 0x2000)
139        manager.ensure_reservation(19)
140        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
141
142        assert self.cache_nonce() == "0000000000002033"
143        assert self.repository.next_free == 0x2033
144
145    def test_server_just_upgraded(self, monkeypatch):
146        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
147
148        enc_cipher = self.MockEncCipher(0x2000)
149        self.repository = self.MockRepository()
150        self.repository.next_free = None
151        self.set_cache_nonce("0000000000002000")
152
153        manager = NonceManager(self.repository, enc_cipher, 0x2000)
154        manager.ensure_reservation(19)
155        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
156
157        assert self.cache_nonce() == "0000000000002033"
158        assert self.repository.next_free == 0x2033
159
160    def test_transaction_abort_no_cache(self, monkeypatch):
161        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
162
163        enc_cipher = self.MockEncCipher(0x1000)
164        self.repository = self.MockRepository()
165        self.repository.next_free = 0x2000
166
167        manager = NonceManager(self.repository, enc_cipher, 0x2000)
168        manager.ensure_reservation(19)
169        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
170
171        assert self.cache_nonce() == "0000000000002033"
172        assert self.repository.next_free == 0x2033
173
174    def test_transaction_abort_old_server(self, monkeypatch):
175        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
176
177        enc_cipher = self.MockEncCipher(0x1000)
178        self.repository = self.MockOldRepository()
179        self.set_cache_nonce("0000000000002000")
180
181        manager = NonceManager(self.repository, enc_cipher, 0x2000)
182        manager.ensure_reservation(19)
183        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
184
185        assert self.cache_nonce() == "0000000000002033"
186
187    def test_transaction_abort_on_other_client(self, monkeypatch):
188        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
189
190        enc_cipher = self.MockEncCipher(0x1000)
191        self.repository = self.MockRepository()
192        self.repository.next_free = 0x2000
193        self.set_cache_nonce("0000000000001000")
194
195        manager = NonceManager(self.repository, enc_cipher, 0x2000)
196        manager.ensure_reservation(19)
197        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
198
199        assert self.cache_nonce() == "0000000000002033"
200        assert self.repository.next_free == 0x2033
201
202    def test_interleaved(self, monkeypatch):
203        monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20)
204
205        enc_cipher = self.MockEncCipher(0x2000)
206        self.repository = self.MockRepository()
207        self.repository.next_free = 0x2000
208        self.set_cache_nonce("0000000000002000")
209
210        manager = NonceManager(self.repository, enc_cipher, 0x2000)
211        manager.ensure_reservation(19)
212        enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19)
213
214        assert self.cache_nonce() == "0000000000002033"
215        assert self.repository.next_free == 0x2033
216
217        # somehow the clients unlocks, another client reserves and this client relocks
218        self.repository.next_free = 0x4000
219
220        # enough space in reservation
221        manager.ensure_reservation(12)
222        enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 12)
223        assert self.cache_nonce() == "0000000000002033"
224        assert self.repository.next_free == 0x4000
225
226        # spans reservation boundary
227        manager.ensure_reservation(21)
228        enc_cipher.expect_iv_and_advance(0x4000, 0x4000 + 21)
229        assert self.cache_nonce() == "0000000000004035"
230        assert self.repository.next_free == 0x4035
231