1import os 2import pytest 3import sys 4import time 5from multiprocessing import Process 6from pathlib import Path 7 8from liquidctl.keyval import RuntimeStorage, _FilesystemBackend 9 10 11@pytest.fixture 12def tmpstore(tmpdir): 13 run_dir = tmpdir.mkdir('run_dir') 14 prefixes = ['prefix'] 15 16 backend = _FilesystemBackend(key_prefixes=prefixes, runtime_dirs=[run_dir]) 17 return RuntimeStorage(prefixes, backend=backend) 18 19 20def test_loads_and_stores(tmpstore): 21 assert tmpstore.load('key') is None 22 assert tmpstore.load('key', default=42) == 42 23 24 tmpstore.store('key', '42') 25 26 assert tmpstore.load('key') == '42' 27 assert tmpstore.load('key', of_type=int) is None 28 29 30def test_updates_with_load_store(tmpstore): 31 assert tmpstore.load_store('key', lambda x: x) == (None, None) 32 assert tmpstore.load_store('key', lambda x: x, default=42) == (None, 42) 33 assert tmpstore.load_store('key', lambda x: str(x)) == (42, '42') 34 assert tmpstore.load_store('key', lambda x: x, of_type=int) == ('42', None) 35 36 37def test_fs_backend_stores_truncate_appropriately(tmpdir): 38 run_dir = tmpdir.mkdir('run_dir') 39 40 # use a separate reader to prevent caching from masking issues 41 writer = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 42 reader = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 43 44 writer.store('key', 42) 45 assert reader.load('key') == 42 46 47 writer.store('key', 1) 48 assert reader.load('key') == 1 49 50 writer.load_store('key', lambda _: 42) 51 assert reader.load('key') == 42 52 53 writer.load_store('key', lambda _: 1) 54 assert reader.load('key') == 1 55 56 57def test_fs_backend_loads_from_fallback_dir(tmpdir): 58 run_dir = tmpdir.mkdir('run_dir') 59 fb_dir = tmpdir.mkdir('fb_dir') 60 61 fallback = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[fb_dir]) 62 fallback.store('key', 42) 63 64 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir, fb_dir]) 65 assert store.load('key') == 42 66 67 store.store('key', -1) 68 assert store.load('key') == -1 69 assert fallback.load('key') == 42, 'fallback location was changed' 70 71 72def test_fs_backend_handles_values_corupted_with_nulls(tmpdir, caplog): 73 run_dir = tmpdir.mkdir('run_dir') 74 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 75 76 store.store('key', 42) 77 key_file = Path(run_dir).joinpath('prefix', 'key') 78 assert key_file.read_bytes() == b'42', 'unit test is unsound' 79 80 key_file.write_bytes(b'\x00') 81 val = store.load('key') 82 83 assert val is None 84 assert 'was corrupted' in caplog.text 85 86 87def test_fs_backend_load_store_returns_old_and_new_values(tmpdir): 88 run_dir = tmpdir.mkdir('run_dir') 89 90 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 91 92 assert store.load_store('key', lambda _: 42) == (None, 42) 93 assert store.load_store('key', lambda x: x + 1) == (42, 43) 94 95 96def test_fs_backend_load_store_loads_from_fallback_dir(tmpdir): 97 run_dir = tmpdir.mkdir('run_dir') 98 fb_dir = tmpdir.mkdir('fb_dir') 99 100 fallback = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[fb_dir]) 101 fallback.store('key', 42) 102 103 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir, fb_dir]) 104 assert store.load_store('key', lambda x: x + 1) == (42, 43) 105 106 assert fallback.load('key') == 42, 'content in fallback location changed' 107 108 109def test_fs_backend_load_store_loads_from_fallback_dir_that_is_symlink(tmpdir): 110 # should deadlock if there is a problem with the lock type or with the 111 # handling of fallback paths that point to the same principal/write 112 # directory 113 114 run_dir = tmpdir.mkdir('run_dir') 115 fb_dir = os.path.join(run_dir, 'symlink') 116 os.symlink(run_dir, fb_dir, target_is_directory=True) 117 118 # don't store any initial value so that the fallback location is checked 119 120 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir, fb_dir]) 121 assert store.load_store('key', lambda x: 42) == (None, 42) 122 123 fallback = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[fb_dir]) 124 assert fallback.load('key') == 42, 'content in fallback symlink did not change' 125 126 127def test_fs_backend_load_store_is_atomic(tmpdir): 128 run_dir = tmpdir.mkdir('run_dir') 129 130 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 131 store.store('key', 42) 132 133 ps = [ 134 Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', .5)), 135 Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', .5)), 136 Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', .5)), 137 ] 138 139 start_time = time.monotonic() 140 141 for p in ps: 142 p.start() 143 144 for p in ps: 145 p.join() 146 147 elapsed = (time.monotonic() - start_time) 148 149 assert store.load('key') == 45 150 assert elapsed >= .5 * len(ps) 151 152 153def test_fs_backend_loads_honor_load_store_locking(tmpdir): 154 run_dir = tmpdir.mkdir('run_dir') 155 156 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 157 store.store('key', 42) 158 159 ps = [ 160 Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', .5)), 161 Process(target=_fs_mp_check_key, args=(run_dir, 'prefix', 'key', 43)), 162 ] 163 164 ps[0].start() 165 time.sleep(.1) 166 ps[1].start() 167 168 for p in ps: 169 p.join() 170 171 172def test_fs_backend_stores_honor_load_store_locking(tmpdir): 173 run_dir = tmpdir.mkdir('run_dir') 174 175 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 176 store.store('key', 42) 177 178 ps = [ 179 Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', .5)), 180 Process(target=_fs_mp_store_key, args=(run_dir, 'prefix', 'key', -1)), 181 ] 182 183 start_time = time.monotonic() 184 185 ps[0].start() 186 time.sleep(.1) 187 ps[1].start() 188 189 # join second process first 190 ps[1].join() 191 192 elapsed = (time.monotonic() - start_time) 193 assert elapsed >= .5 194 195 ps[0].join() 196 assert store.load('key') == -1 197 198 199def test_fs_backend_releases_locks(tmpdir): 200 # should deadlock if any method does not properly release its lock 201 202 run_dir = tmpdir.mkdir('run_dir') 203 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 204 205 def incr_from_other_process(): 206 other = Process(target=_fs_mp_increment_key, args=(run_dir, 'prefix', 'key', 0.)) 207 other.start() 208 other.join() 209 210 store.store('key', 42) 211 incr_from_other_process() 212 assert store.load('key') == 43 213 214 store.load_store('key', lambda _: -1) 215 incr_from_other_process() 216 assert store.load('key') == 0 217 218 incr_from_other_process() 219 assert store.load('key') == 1 220 221 222def _fs_mp_increment_key(run_dir, prefix, key, sleep): 223 """Open a _FilesystemBackend and increment `key`. 224 225 For the `multiprocessing` tests. 226 227 Opens the storage on `run_dir` and with `prefix`. Sleeps for `sleep` 228 seconds within the increment closure. 229 """ 230 231 def l(x): 232 time.sleep(sleep) 233 return x + 1 234 235 store = _FilesystemBackend(key_prefixes=[prefix], runtime_dirs=[run_dir]) 236 store.load_store(key, l) 237 238 239def _fs_mp_check_key(run_dir, prefix, key, expected): 240 """Open a _FilesystemBackend and check `key` value against `expected`. 241 242 For the `multiprocessing` tests. 243 244 Opens the storage on `run_dir` and with `prefix`. 245 """ 246 247 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 248 assert store.load(key) == expected 249 250 251def _fs_mp_store_key(run_dir, prefix, key, new_value): 252 """Open a _FilesystemBackend and store `new_value` for `key`. 253 254 For the `multiprocessing` tests. 255 256 Opens the storage on `run_dir` and with `prefix`. 257 """ 258 259 store = _FilesystemBackend(key_prefixes=['prefix'], runtime_dirs=[run_dir]) 260 store.store(key, new_value) 261