1""" 2In-memory caching used by Salt 3""" 4 5import functools 6import logging 7import os 8import re 9import time 10 11import salt.config 12import salt.payload 13import salt.utils.atomicfile 14import salt.utils.data 15import salt.utils.dictupdate 16import salt.utils.files 17import salt.utils.msgpack 18from salt.utils.zeromq import zmq 19 20log = logging.getLogger(__name__) 21 22 23class CacheFactory: 24 """ 25 Cache which can use a number of backends 26 """ 27 28 @classmethod 29 def factory(cls, backend, ttl, *args, **kwargs): 30 log.debug("Factory backend: %s", backend) 31 if backend == "memory": 32 return CacheDict(ttl, *args, **kwargs) 33 elif backend == "disk": 34 return CacheDisk(ttl, kwargs["minion_cache_path"], *args, **kwargs) 35 else: 36 log.error("CacheFactory received unrecognized cache type") 37 38 39class CacheDict(dict): 40 """ 41 Subclass of dict that will lazily delete items past ttl 42 """ 43 44 def __init__(self, ttl, *args, **kwargs): 45 dict.__init__(self, *args, **kwargs) 46 self._ttl = ttl 47 self._key_cache_time = {} 48 49 def _enforce_ttl_key(self, key): 50 """ 51 Enforce the TTL to a specific key, delete if its past TTL 52 """ 53 if key not in self._key_cache_time: 54 return 55 if time.time() - self._key_cache_time[key] > self._ttl: 56 del self._key_cache_time[key] 57 dict.__delitem__(self, key) 58 59 def __getitem__(self, key): 60 """ 61 Check if the key is ttld out, then do the get 62 """ 63 self._enforce_ttl_key(key) 64 return dict.__getitem__(self, key) 65 66 def __setitem__(self, key, val): 67 """ 68 Make sure to update the key cache time 69 """ 70 self._key_cache_time[key] = time.time() 71 dict.__setitem__(self, key, val) 72 73 def __contains__(self, key): 74 self._enforce_ttl_key(key) 75 return dict.__contains__(self, key) 76 77 78class CacheDisk(CacheDict): 79 """ 80 Class that represents itself as a dictionary to a consumer 81 but uses a disk-based backend. Serialization and de-serialization 82 is done with msgpack 83 """ 84 85 def __init__(self, ttl, path, *args, **kwargs): 86 super().__init__(ttl, *args, **kwargs) 87 self._path = path 88 self._dict = {} 89 self._read() 90 91 def _enforce_ttl_key(self, key): 92 """ 93 Enforce the TTL to a specific key, delete if its past TTL 94 """ 95 if key not in self._key_cache_time: 96 return 97 if time.time() - self._key_cache_time[key] > self._ttl: 98 del self._key_cache_time[key] 99 self._dict.__delitem__(key) 100 101 def __contains__(self, key): 102 self._enforce_ttl_key(key) 103 return self._dict.__contains__(key) 104 105 def __getitem__(self, key): 106 """ 107 Check if the key is ttld out, then do the get 108 """ 109 self._enforce_ttl_key(key) 110 return self._dict.__getitem__(key) 111 112 def __setitem__(self, key, val): 113 """ 114 Make sure to update the key cache time 115 """ 116 self._key_cache_time[key] = time.time() 117 self._dict.__setitem__(key, val) 118 # Do the same as the parent but also persist 119 self._write() 120 121 def __delitem__(self, key): 122 """ 123 Make sure to remove the key cache time 124 """ 125 del self._key_cache_time[key] 126 self._dict.__delitem__(key) 127 # Do the same as the parent but also persist 128 self._write() 129 130 def clear(self): 131 """ 132 Clear the cache 133 """ 134 self._key_cache_time.clear() 135 self._dict.clear() 136 # Do the same as the parent but also persist 137 self._write() 138 139 def _read(self): 140 """ 141 Read in from disk 142 """ 143 if not salt.utils.msgpack.HAS_MSGPACK or not os.path.exists(self._path): 144 return 145 with salt.utils.files.fopen(self._path, "rb") as fp_: 146 cache = salt.utils.data.decode( 147 salt.utils.msgpack.load(fp_, encoding=__salt_system_encoding__) 148 ) 149 if "CacheDisk_cachetime" in cache: # new format 150 self._dict = cache["CacheDisk_data"] 151 self._key_cache_time = cache["CacheDisk_cachetime"] 152 else: # old format 153 self._dict = cache 154 timestamp = os.path.getmtime(self._path) 155 for key in self._dict: 156 self._key_cache_time[key] = timestamp 157 if log.isEnabledFor(logging.DEBUG): 158 log.debug("Disk cache retrieved: %s", cache) 159 160 def _write(self): 161 """ 162 Write out to disk 163 """ 164 if not salt.utils.msgpack.HAS_MSGPACK: 165 return 166 # TODO Add check into preflight to ensure dir exists 167 # TODO Dir hashing? 168 with salt.utils.atomicfile.atomic_open(self._path, "wb+") as fp_: 169 cache = { 170 "CacheDisk_data": self._dict, 171 "CacheDisk_cachetime": self._key_cache_time, 172 } 173 salt.utils.msgpack.dump(cache, fp_, use_bin_type=True) 174 175 176class CacheCli: 177 """ 178 Connection client for the ConCache. Should be used by all 179 components that need the list of currently connected minions 180 """ 181 182 def __init__(self, opts): 183 """ 184 Sets up the zmq-connection to the ConCache 185 """ 186 self.opts = opts 187 self.cache_sock = os.path.join(self.opts["sock_dir"], "con_cache.ipc") 188 self.cache_upd_sock = os.path.join(self.opts["sock_dir"], "con_upd.ipc") 189 190 context = zmq.Context() 191 192 # the socket for talking to the cache 193 self.creq_out = context.socket(zmq.REQ) 194 self.creq_out.setsockopt(zmq.LINGER, 100) 195 self.creq_out.connect("ipc://" + self.cache_sock) 196 197 # the socket for sending updates to the cache 198 self.cupd_out = context.socket(zmq.PUB) 199 self.cupd_out.setsockopt(zmq.LINGER, 1) 200 self.cupd_out.connect("ipc://" + self.cache_upd_sock) 201 202 def put_cache(self, minions): 203 """ 204 published the given minions to the ConCache 205 """ 206 self.cupd_out.send(salt.payload.dumps(minions)) 207 208 def get_cached(self): 209 """ 210 queries the ConCache for a list of currently connected minions 211 """ 212 msg = salt.payload.dumps("minions") 213 self.creq_out.send(msg) 214 min_list = salt.payload.loads(self.creq_out.recv()) 215 return min_list 216 217 218class CacheRegex: 219 """ 220 Create a regular expression object cache for the most frequently 221 used patterns to minimize compilation of the same patterns over 222 and over again 223 """ 224 225 def __init__( 226 self, prepend="", append="", size=1000, keep_fraction=0.8, max_age=3600 227 ): 228 self.prepend = prepend 229 self.append = append 230 self.size = size 231 self.clear_size = int(size - size * (keep_fraction)) 232 if self.clear_size >= size: 233 self.clear_size = int(size / 2) + 1 234 if self.clear_size > size: 235 self.clear_size = size 236 self.max_age = max_age 237 self.cache = {} 238 self.timestamp = time.time() 239 240 def clear(self): 241 """ 242 Clear the cache 243 """ 244 self.cache.clear() 245 246 def sweep(self): 247 """ 248 Sweep the cache and remove the outdated or least frequently 249 used entries 250 """ 251 if self.max_age < time.time() - self.timestamp: 252 self.clear() 253 self.timestamp = time.time() 254 else: 255 paterns = list(self.cache.values()) 256 paterns.sort() 257 for idx in range(self.clear_size): 258 del self.cache[paterns[idx][2]] 259 260 def get(self, pattern): 261 """ 262 Get a compiled regular expression object based on pattern and 263 cache it when it is not in the cache already 264 """ 265 try: 266 self.cache[pattern][0] += 1 267 return self.cache[pattern][1] 268 except KeyError: 269 pass 270 if len(self.cache) > self.size: 271 self.sweep() 272 regex = re.compile("{}{}{}".format(self.prepend, pattern, self.append)) 273 self.cache[pattern] = [1, regex, pattern, time.time()] 274 return regex 275 276 277class ContextCache: 278 def __init__(self, opts, name): 279 """ 280 Create a context cache 281 """ 282 self.opts = opts 283 self.cache_path = os.path.join(opts["cachedir"], "context", "{}.p".format(name)) 284 285 def cache_context(self, context): 286 """ 287 Cache the given context to disk 288 """ 289 if not os.path.isdir(os.path.dirname(self.cache_path)): 290 os.mkdir(os.path.dirname(self.cache_path)) 291 with salt.utils.files.fopen(self.cache_path, "w+b") as cache: 292 salt.payload.dump(context, cache) 293 294 def get_cache_context(self): 295 """ 296 Retrieve a context cache from disk 297 """ 298 with salt.utils.files.fopen(self.cache_path, "rb") as cache: 299 return salt.utils.data.decode(salt.payload.load(cache)) 300 301 302def context_cache(func): 303 """ 304 A decorator to be used module functions which need to cache their 305 context. 306 307 To evaluate a __context__ and re-hydrate it if a given key 308 is empty or contains no items, pass a list of keys to evaulate. 309 """ 310 311 @functools.wraps(func) 312 def context_cache_wrap(*args, **kwargs): 313 try: 314 func_context = func.__globals__["__context__"].value() 315 except AttributeError: 316 func_context = func.__globals__["__context__"] 317 try: 318 func_opts = func.__globals__["__opts__"].value() 319 except AttributeError: 320 func_opts = func.__globals__["__opts__"] 321 func_name = func.__globals__["__name__"] 322 323 context_cache = ContextCache(func_opts, func_name) 324 if not func_context and os.path.isfile(context_cache.cache_path): 325 salt.utils.dictupdate.update( 326 func_context, context_cache.get_cache_context() 327 ) 328 else: 329 context_cache.cache_context(func_context) 330 return func(*args, **kwargs) 331 332 return context_cache_wrap 333