1import os.path 2 3import pytest 4 5from ..crypto import nonces 6from ..crypto.nonces import NonceManager 7from ..crypto.key import bin_to_hex 8from ..helpers import get_security_dir 9from ..remote import InvalidRPCMethod 10 11 12class TestNonceManager: 13 14 class MockRepository: 15 class _Location: 16 orig = '/some/place' 17 18 _location = _Location() 19 id = bytes(32) 20 id_str = bin_to_hex(id) 21 22 def get_free_nonce(self): 23 return self.next_free 24 25 def commit_nonce_reservation(self, next_unreserved, start_nonce): 26 assert start_nonce == self.next_free 27 self.next_free = next_unreserved 28 29 class MockOldRepository(MockRepository): 30 def get_free_nonce(self): 31 raise InvalidRPCMethod("") 32 33 def commit_nonce_reservation(self, next_unreserved, start_nonce): 34 pytest.fail("commit_nonce_reservation should never be called on an old repository") 35 36 class MockEncCipher: 37 def __init__(self, iv): 38 self.iv_set = False # placeholder, this is never a valid iv 39 self.iv = iv 40 41 def reset(self, key, iv): 42 assert key is None 43 assert iv is not False 44 self.iv_set = iv 45 self.iv = iv 46 47 def expect_iv_and_advance(self, expected_iv, advance): 48 expected_iv = expected_iv.to_bytes(16, byteorder='big') 49 iv_set = self.iv_set 50 assert iv_set == expected_iv 51 self.iv_set = False 52 self.iv = advance.to_bytes(16, byteorder='big') 53 54 def expect_no_reset_and_advance(self, advance): 55 iv_set = self.iv_set 56 assert iv_set is False 57 self.iv = advance.to_bytes(16, byteorder='big') 58 59 def setUp(self): 60 self.repository = None 61 62 def cache_nonce(self): 63 with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "r") as fd: 64 return fd.read() 65 66 def set_cache_nonce(self, nonce): 67 with open(os.path.join(get_security_dir(self.repository.id_str), 'nonce'), "w") as fd: 68 assert fd.write(nonce) 69 70 def test_empty_cache_and_old_server(self, monkeypatch): 71 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 72 73 enc_cipher = self.MockEncCipher(0x2000) 74 self.repository = self.MockOldRepository() 75 manager = NonceManager(self.repository, enc_cipher, 0x2000) 76 manager.ensure_reservation(19) 77 enc_cipher.expect_iv_and_advance(0x2000, 0x2013) 78 79 assert self.cache_nonce() == "0000000000002033" 80 81 def test_empty_cache(self, monkeypatch): 82 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 83 84 enc_cipher = self.MockEncCipher(0x2000) 85 self.repository = self.MockRepository() 86 self.repository.next_free = 0x2000 87 manager = NonceManager(self.repository, enc_cipher, 0x2000) 88 manager.ensure_reservation(19) 89 enc_cipher.expect_iv_and_advance(0x2000, 0x2013) 90 91 assert self.cache_nonce() == "0000000000002033" 92 93 def test_empty_nonce(self, monkeypatch): 94 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 95 96 enc_cipher = self.MockEncCipher(0x2000) 97 self.repository = self.MockRepository() 98 self.repository.next_free = None 99 manager = NonceManager(self.repository, enc_cipher, 0x2000) 100 manager.ensure_reservation(19) 101 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 102 103 assert self.cache_nonce() == "0000000000002033" 104 assert self.repository.next_free == 0x2033 105 106 # enough space in reservation 107 manager.ensure_reservation(13) 108 enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13) 109 assert self.cache_nonce() == "0000000000002033" 110 assert self.repository.next_free == 0x2033 111 112 # just barely enough space in reservation 113 manager.ensure_reservation(19) 114 enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19) 115 assert self.cache_nonce() == "0000000000002033" 116 assert self.repository.next_free == 0x2033 117 118 # no space in reservation 119 manager.ensure_reservation(16) 120 enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16) 121 assert self.cache_nonce() == "0000000000002063" 122 assert self.repository.next_free == 0x2063 123 124 # spans reservation boundary 125 manager.ensure_reservation(64) 126 enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 13 + 19 + 16 + 64) 127 assert self.cache_nonce() == "00000000000020c3" 128 assert self.repository.next_free == 0x20c3 129 130 def test_sync_nonce(self, monkeypatch): 131 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 132 133 enc_cipher = self.MockEncCipher(0x2000) 134 self.repository = self.MockRepository() 135 self.repository.next_free = 0x2000 136 self.set_cache_nonce("0000000000002000") 137 138 manager = NonceManager(self.repository, enc_cipher, 0x2000) 139 manager.ensure_reservation(19) 140 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 141 142 assert self.cache_nonce() == "0000000000002033" 143 assert self.repository.next_free == 0x2033 144 145 def test_server_just_upgraded(self, monkeypatch): 146 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 147 148 enc_cipher = self.MockEncCipher(0x2000) 149 self.repository = self.MockRepository() 150 self.repository.next_free = None 151 self.set_cache_nonce("0000000000002000") 152 153 manager = NonceManager(self.repository, enc_cipher, 0x2000) 154 manager.ensure_reservation(19) 155 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 156 157 assert self.cache_nonce() == "0000000000002033" 158 assert self.repository.next_free == 0x2033 159 160 def test_transaction_abort_no_cache(self, monkeypatch): 161 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 162 163 enc_cipher = self.MockEncCipher(0x1000) 164 self.repository = self.MockRepository() 165 self.repository.next_free = 0x2000 166 167 manager = NonceManager(self.repository, enc_cipher, 0x2000) 168 manager.ensure_reservation(19) 169 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 170 171 assert self.cache_nonce() == "0000000000002033" 172 assert self.repository.next_free == 0x2033 173 174 def test_transaction_abort_old_server(self, monkeypatch): 175 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 176 177 enc_cipher = self.MockEncCipher(0x1000) 178 self.repository = self.MockOldRepository() 179 self.set_cache_nonce("0000000000002000") 180 181 manager = NonceManager(self.repository, enc_cipher, 0x2000) 182 manager.ensure_reservation(19) 183 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 184 185 assert self.cache_nonce() == "0000000000002033" 186 187 def test_transaction_abort_on_other_client(self, monkeypatch): 188 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 189 190 enc_cipher = self.MockEncCipher(0x1000) 191 self.repository = self.MockRepository() 192 self.repository.next_free = 0x2000 193 self.set_cache_nonce("0000000000001000") 194 195 manager = NonceManager(self.repository, enc_cipher, 0x2000) 196 manager.ensure_reservation(19) 197 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 198 199 assert self.cache_nonce() == "0000000000002033" 200 assert self.repository.next_free == 0x2033 201 202 def test_interleaved(self, monkeypatch): 203 monkeypatch.setattr(nonces, 'NONCE_SPACE_RESERVATION', 0x20) 204 205 enc_cipher = self.MockEncCipher(0x2000) 206 self.repository = self.MockRepository() 207 self.repository.next_free = 0x2000 208 self.set_cache_nonce("0000000000002000") 209 210 manager = NonceManager(self.repository, enc_cipher, 0x2000) 211 manager.ensure_reservation(19) 212 enc_cipher.expect_iv_and_advance(0x2000, 0x2000 + 19) 213 214 assert self.cache_nonce() == "0000000000002033" 215 assert self.repository.next_free == 0x2033 216 217 # somehow the clients unlocks, another client reserves and this client relocks 218 self.repository.next_free = 0x4000 219 220 # enough space in reservation 221 manager.ensure_reservation(12) 222 enc_cipher.expect_no_reset_and_advance(0x2000 + 19 + 12) 223 assert self.cache_nonce() == "0000000000002033" 224 assert self.repository.next_free == 0x4000 225 226 # spans reservation boundary 227 manager.ensure_reservation(21) 228 enc_cipher.expect_iv_and_advance(0x4000, 0x4000 + 21) 229 assert self.cache_nonce() == "0000000000004035" 230 assert self.repository.next_free == 0x4035 231