1# Copyright 2015, 2016 OpenMarket Ltd
2# Copyright 2019, 2020 The Matrix.org Foundation C.I.C.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import collections
16import logging
17import typing
18from enum import Enum, auto
19from sys import intern
20from typing import Any, Callable, Dict, List, Optional, Sized
21
22import attr
23from prometheus_client.core import Gauge
24
25from synapse.config.cache import add_resizable_cache
26
27logger = logging.getLogger(__name__)
28
29
30# Whether to track estimated memory usage of the LruCaches.
31TRACK_MEMORY_USAGE = False
32
33
34caches_by_name: Dict[str, Sized] = {}
35collectors_by_name: Dict[str, "CacheMetric"] = {}
36
37cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
38cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
39cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name", "reason"])
40cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
41cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
42cache_memory_usage = Gauge(
43    "synapse_util_caches_cache_size_bytes",
44    "Estimated memory usage of the caches",
45    ["name"],
46)
47
48response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
49response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
50response_cache_evicted = Gauge(
51    "synapse_util_caches_response_cache:evicted_size", "", ["name", "reason"]
52)
53response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
54
55
56class EvictionReason(Enum):
57    size = auto()
58    time = auto()
59
60
61@attr.s(slots=True, auto_attribs=True)
62class CacheMetric:
63
64    _cache: Sized
65    _cache_type: str
66    _cache_name: str
67    _collect_callback: Optional[Callable]
68
69    hits: int = 0
70    misses: int = 0
71    eviction_size_by_reason: typing.Counter[EvictionReason] = attr.ib(
72        factory=collections.Counter
73    )
74    memory_usage: Optional[int] = None
75
76    def inc_hits(self) -> None:
77        self.hits += 1
78
79    def inc_misses(self) -> None:
80        self.misses += 1
81
82    def inc_evictions(self, reason: EvictionReason, size: int = 1) -> None:
83        self.eviction_size_by_reason[reason] += size
84
85    def inc_memory_usage(self, memory: int) -> None:
86        if self.memory_usage is None:
87            self.memory_usage = 0
88
89        self.memory_usage += memory
90
91    def dec_memory_usage(self, memory: int) -> None:
92        assert self.memory_usage is not None
93        self.memory_usage -= memory
94
95    def clear_memory_usage(self) -> None:
96        if self.memory_usage is not None:
97            self.memory_usage = 0
98
99    def describe(self) -> List[str]:
100        return []
101
102    def collect(self) -> None:
103        try:
104            if self._cache_type == "response_cache":
105                response_cache_size.labels(self._cache_name).set(len(self._cache))
106                response_cache_hits.labels(self._cache_name).set(self.hits)
107                for reason in EvictionReason:
108                    response_cache_evicted.labels(self._cache_name, reason.name).set(
109                        self.eviction_size_by_reason[reason]
110                    )
111                response_cache_total.labels(self._cache_name).set(
112                    self.hits + self.misses
113                )
114            else:
115                cache_size.labels(self._cache_name).set(len(self._cache))
116                cache_hits.labels(self._cache_name).set(self.hits)
117                for reason in EvictionReason:
118                    cache_evicted.labels(self._cache_name, reason.name).set(
119                        self.eviction_size_by_reason[reason]
120                    )
121                cache_total.labels(self._cache_name).set(self.hits + self.misses)
122                max_size = getattr(self._cache, "max_size", None)
123                if max_size:
124                    cache_max_size.labels(self._cache_name).set(max_size)
125
126                if TRACK_MEMORY_USAGE:
127                    # self.memory_usage can be None if nothing has been inserted
128                    # into the cache yet.
129                    cache_memory_usage.labels(self._cache_name).set(
130                        self.memory_usage or 0
131                    )
132            if self._collect_callback:
133                self._collect_callback()
134        except Exception as e:
135            logger.warning("Error calculating metrics for %s: %s", self._cache_name, e)
136            raise
137
138
139def register_cache(
140    cache_type: str,
141    cache_name: str,
142    cache: Sized,
143    collect_callback: Optional[Callable] = None,
144    resizable: bool = True,
145    resize_callback: Optional[Callable] = None,
146) -> CacheMetric:
147    """Register a cache object for metric collection and resizing.
148
149    Args:
150        cache_type: a string indicating the "type" of the cache. This is used
151            only for deduplication so isn't too important provided it's constant.
152        cache_name: name of the cache
153        cache: cache itself, which must implement __len__(), and may optionally implement
154             a max_size property
155        collect_callback: If given, a function which is called during metric
156            collection to update additional metrics.
157        resizable: Whether this cache supports being resized, in which case either
158            resize_callback must be provided, or the cache must support set_max_size().
159        resize_callback: A function which can be called to resize the cache.
160
161    Returns:
162        CacheMetric: an object which provides inc_{hits,misses,evictions} methods
163    """
164    if resizable:
165        if not resize_callback:
166            resize_callback = cache.set_cache_factor  # type: ignore
167        add_resizable_cache(cache_name, resize_callback)
168
169    metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
170    metric_name = "cache_%s_%s" % (cache_type, cache_name)
171    caches_by_name[cache_name] = cache
172    collectors_by_name[metric_name] = metric
173    return metric
174
175
176KNOWN_KEYS = {
177    key: key
178    for key in (
179        "auth_events",
180        "content",
181        "depth",
182        "event_id",
183        "hashes",
184        "origin",
185        "origin_server_ts",
186        "prev_events",
187        "room_id",
188        "sender",
189        "signatures",
190        "state_key",
191        "type",
192        "unsigned",
193        "user_id",
194    )
195}
196
197
198def intern_string(string: Optional[str]) -> Optional[str]:
199    """Takes a (potentially) unicode string and interns it if it's ascii"""
200    if string is None:
201        return None
202
203    try:
204        return intern(string)
205    except UnicodeEncodeError:
206        return string
207
208
209def intern_dict(dictionary: Dict[str, Any]) -> Dict[str, Any]:
210    """Takes a dictionary and interns well known keys and their values"""
211    return {
212        KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
213        for key, value in dictionary.items()
214    }
215
216
217def _intern_known_values(key: str, value: Any) -> Any:
218    intern_keys = ("event_id", "room_id", "sender", "user_id", "type", "state_key")
219
220    if key in intern_keys:
221        return intern_string(value)
222
223    return value
224