1# Copyright 2015, 2016 OpenMarket Ltd 2# Copyright 2019, 2020 The Matrix.org Foundation C.I.C. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import collections 16import logging 17import typing 18from enum import Enum, auto 19from sys import intern 20from typing import Any, Callable, Dict, List, Optional, Sized 21 22import attr 23from prometheus_client.core import Gauge 24 25from synapse.config.cache import add_resizable_cache 26 27logger = logging.getLogger(__name__) 28 29 30# Whether to track estimated memory usage of the LruCaches. 31TRACK_MEMORY_USAGE = False 32 33 34caches_by_name: Dict[str, Sized] = {} 35collectors_by_name: Dict[str, "CacheMetric"] = {} 36 37cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) 38cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) 39cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name", "reason"]) 40cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) 41cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"]) 42cache_memory_usage = Gauge( 43 "synapse_util_caches_cache_size_bytes", 44 "Estimated memory usage of the caches", 45 ["name"], 46) 47 48response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) 49response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) 50response_cache_evicted = Gauge( 51 "synapse_util_caches_response_cache:evicted_size", "", ["name", "reason"] 52) 53response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) 54 55 56class EvictionReason(Enum): 57 size = auto() 58 time = auto() 59 60 61@attr.s(slots=True, auto_attribs=True) 62class CacheMetric: 63 64 _cache: Sized 65 _cache_type: str 66 _cache_name: str 67 _collect_callback: Optional[Callable] 68 69 hits: int = 0 70 misses: int = 0 71 eviction_size_by_reason: typing.Counter[EvictionReason] = attr.ib( 72 factory=collections.Counter 73 ) 74 memory_usage: Optional[int] = None 75 76 def inc_hits(self) -> None: 77 self.hits += 1 78 79 def inc_misses(self) -> None: 80 self.misses += 1 81 82 def inc_evictions(self, reason: EvictionReason, size: int = 1) -> None: 83 self.eviction_size_by_reason[reason] += size 84 85 def inc_memory_usage(self, memory: int) -> None: 86 if self.memory_usage is None: 87 self.memory_usage = 0 88 89 self.memory_usage += memory 90 91 def dec_memory_usage(self, memory: int) -> None: 92 assert self.memory_usage is not None 93 self.memory_usage -= memory 94 95 def clear_memory_usage(self) -> None: 96 if self.memory_usage is not None: 97 self.memory_usage = 0 98 99 def describe(self) -> List[str]: 100 return [] 101 102 def collect(self) -> None: 103 try: 104 if self._cache_type == "response_cache": 105 response_cache_size.labels(self._cache_name).set(len(self._cache)) 106 response_cache_hits.labels(self._cache_name).set(self.hits) 107 for reason in EvictionReason: 108 response_cache_evicted.labels(self._cache_name, reason.name).set( 109 self.eviction_size_by_reason[reason] 110 ) 111 response_cache_total.labels(self._cache_name).set( 112 self.hits + self.misses 113 ) 114 else: 115 cache_size.labels(self._cache_name).set(len(self._cache)) 116 cache_hits.labels(self._cache_name).set(self.hits) 117 for reason in EvictionReason: 118 cache_evicted.labels(self._cache_name, reason.name).set( 119 self.eviction_size_by_reason[reason] 120 ) 121 cache_total.labels(self._cache_name).set(self.hits + self.misses) 122 max_size = getattr(self._cache, "max_size", None) 123 if max_size: 124 cache_max_size.labels(self._cache_name).set(max_size) 125 126 if TRACK_MEMORY_USAGE: 127 # self.memory_usage can be None if nothing has been inserted 128 # into the cache yet. 129 cache_memory_usage.labels(self._cache_name).set( 130 self.memory_usage or 0 131 ) 132 if self._collect_callback: 133 self._collect_callback() 134 except Exception as e: 135 logger.warning("Error calculating metrics for %s: %s", self._cache_name, e) 136 raise 137 138 139def register_cache( 140 cache_type: str, 141 cache_name: str, 142 cache: Sized, 143 collect_callback: Optional[Callable] = None, 144 resizable: bool = True, 145 resize_callback: Optional[Callable] = None, 146) -> CacheMetric: 147 """Register a cache object for metric collection and resizing. 148 149 Args: 150 cache_type: a string indicating the "type" of the cache. This is used 151 only for deduplication so isn't too important provided it's constant. 152 cache_name: name of the cache 153 cache: cache itself, which must implement __len__(), and may optionally implement 154 a max_size property 155 collect_callback: If given, a function which is called during metric 156 collection to update additional metrics. 157 resizable: Whether this cache supports being resized, in which case either 158 resize_callback must be provided, or the cache must support set_max_size(). 159 resize_callback: A function which can be called to resize the cache. 160 161 Returns: 162 CacheMetric: an object which provides inc_{hits,misses,evictions} methods 163 """ 164 if resizable: 165 if not resize_callback: 166 resize_callback = cache.set_cache_factor # type: ignore 167 add_resizable_cache(cache_name, resize_callback) 168 169 metric = CacheMetric(cache, cache_type, cache_name, collect_callback) 170 metric_name = "cache_%s_%s" % (cache_type, cache_name) 171 caches_by_name[cache_name] = cache 172 collectors_by_name[metric_name] = metric 173 return metric 174 175 176KNOWN_KEYS = { 177 key: key 178 for key in ( 179 "auth_events", 180 "content", 181 "depth", 182 "event_id", 183 "hashes", 184 "origin", 185 "origin_server_ts", 186 "prev_events", 187 "room_id", 188 "sender", 189 "signatures", 190 "state_key", 191 "type", 192 "unsigned", 193 "user_id", 194 ) 195} 196 197 198def intern_string(string: Optional[str]) -> Optional[str]: 199 """Takes a (potentially) unicode string and interns it if it's ascii""" 200 if string is None: 201 return None 202 203 try: 204 return intern(string) 205 except UnicodeEncodeError: 206 return string 207 208 209def intern_dict(dictionary: Dict[str, Any]) -> Dict[str, Any]: 210 """Takes a dictionary and interns well known keys and their values""" 211 return { 212 KNOWN_KEYS.get(key, key): _intern_known_values(key, value) 213 for key, value in dictionary.items() 214 } 215 216 217def _intern_known_values(key: str, value: Any) -> Any: 218 intern_keys = ("event_id", "room_id", "sender", "user_id", "type", "state_key") 219 220 if key in intern_keys: 221 return intern_string(value) 222 223 return value 224