1from threading import Lock 2 3from . import Backend 4from .. import signals, config 5 6 7class MemoryBackend(Backend): 8 """ 9 Simple in-memory backend that should be mostly used for testing purposes 10 """ 11 _storage = {} 12 _lock = Lock() 13 14 def __init__(self): 15 super().__init__() 16 17 def get(self, key): 18 with self._lock: 19 return self._storage.get(key) 20 21 def mget(self, keys): 22 if not keys: 23 return 24 result = [] 25 with self._lock: 26 for key in keys: 27 value = self._storage.get(key) 28 if value is not None: 29 result.append((key, value)) 30 return result 31 32 def set(self, key, value): 33 with self._lock: 34 old_value = self._storage.get(key) 35 self._storage[key] = value 36 signals.config_updated.send( 37 sender=config, key=key, old_value=old_value, new_value=value 38 ) 39