1# Copyright 2015, 2016 OpenMarket Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import functools
16import gc
17import itertools
18import logging
19import os
20import platform
21import threading
22import time
23from typing import (
24    Any,
25    Callable,
26    Dict,
27    Generic,
28    Iterable,
29    Mapping,
30    Optional,
31    Sequence,
32    Set,
33    Tuple,
34    Type,
35    TypeVar,
36    Union,
37    cast,
38)
39
40import attr
41from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
42from prometheus_client.core import (
43    REGISTRY,
44    CounterMetricFamily,
45    GaugeHistogramMetricFamily,
46    GaugeMetricFamily,
47)
48
49from twisted.internet import reactor
50from twisted.internet.base import ReactorBase
51from twisted.python.threadpool import ThreadPool
52
53import synapse
54from synapse.metrics._exposition import (
55    MetricsResource,
56    generate_latest,
57    start_http_server,
58)
59from synapse.util.versionstring import get_version_string
60
61logger = logging.getLogger(__name__)
62
63METRICS_PREFIX = "/_synapse/metrics"
64
65running_on_pypy = platform.python_implementation() == "PyPy"
66all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {}
67
68HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
69
70
71class RegistryProxy:
72    @staticmethod
73    def collect() -> Iterable[Metric]:
74        for metric in REGISTRY.collect():
75            if not metric.name.startswith("__"):
76                yield metric
77
78
79@attr.s(slots=True, hash=True)
80class LaterGauge:
81
82    name = attr.ib(type=str)
83    desc = attr.ib(type=str)
84    labels = attr.ib(hash=False, type=Optional[Iterable[str]])
85    # callback: should either return a value (if there are no labels for this metric),
86    # or dict mapping from a label tuple to a value
87    caller = attr.ib(
88        type=Callable[
89            [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
90        ]
91    )
92
93    def collect(self) -> Iterable[Metric]:
94
95        g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
96
97        try:
98            calls = self.caller()
99        except Exception:
100            logger.exception("Exception running callback for LaterGauge(%s)", self.name)
101            yield g
102            return
103
104        if isinstance(calls, (int, float)):
105            g.add_metric([], calls)
106        else:
107            for k, v in calls.items():
108                g.add_metric(k, v)
109
110        yield g
111
112    def __attrs_post_init__(self) -> None:
113        self._register()
114
115    def _register(self) -> None:
116        if self.name in all_gauges.keys():
117            logger.warning("%s already registered, reregistering" % (self.name,))
118            REGISTRY.unregister(all_gauges.pop(self.name))
119
120        REGISTRY.register(self)
121        all_gauges[self.name] = self
122
123
124# `MetricsEntry` only makes sense when it is a `Protocol`,
125# but `Protocol` can't be used as a `TypeVar` bound.
126MetricsEntry = TypeVar("MetricsEntry")
127
128
129class InFlightGauge(Generic[MetricsEntry]):
130    """Tracks number of things (e.g. requests, Measure blocks, etc) in flight
131    at any given time.
132
133    Each InFlightGauge will create a metric called `<name>_total` that counts
134    the number of in flight blocks, as well as a metrics for each item in the
135    given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
136    callbacks.
137
138    Args:
139        name
140        desc
141        labels
142        sub_metrics: A list of sub metrics that the callbacks will update.
143    """
144
145    def __init__(
146        self,
147        name: str,
148        desc: str,
149        labels: Sequence[str],
150        sub_metrics: Sequence[str],
151    ):
152        self.name = name
153        self.desc = desc
154        self.labels = labels
155        self.sub_metrics = sub_metrics
156
157        # Create a class which have the sub_metrics values as attributes, which
158        # default to 0 on initialization. Used to pass to registered callbacks.
159        self._metrics_class: Type[MetricsEntry] = attr.make_class(
160            "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
161        )
162
163        # Counts number of in flight blocks for a given set of label values
164        self._registrations: Dict[
165            Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
166        ] = {}
167
168        # Protects access to _registrations
169        self._lock = threading.Lock()
170
171        self._register_with_collector()
172
173    def register(
174        self,
175        key: Tuple[str, ...],
176        callback: Callable[[MetricsEntry], None],
177    ) -> None:
178        """Registers that we've entered a new block with labels `key`.
179
180        `callback` gets called each time the metrics are collected. The same
181        value must also be given to `unregister`.
182
183        `callback` gets called with an object that has an attribute per
184        sub_metric, which should be updated with the necessary values. Note that
185        the metrics object is shared between all callbacks registered with the
186        same key.
187
188        Note that `callback` may be called on a separate thread.
189        """
190        with self._lock:
191            self._registrations.setdefault(key, set()).add(callback)
192
193    def unregister(
194        self,
195        key: Tuple[str, ...],
196        callback: Callable[[MetricsEntry], None],
197    ) -> None:
198        """Registers that we've exited a block with labels `key`."""
199
200        with self._lock:
201            self._registrations.setdefault(key, set()).discard(callback)
202
203    def collect(self) -> Iterable[Metric]:
204        """Called by prometheus client when it reads metrics.
205
206        Note: may be called by a separate thread.
207        """
208        in_flight = GaugeMetricFamily(
209            self.name + "_total", self.desc, labels=self.labels
210        )
211
212        metrics_by_key = {}
213
214        # We copy so that we don't mutate the list while iterating
215        with self._lock:
216            keys = list(self._registrations)
217
218        for key in keys:
219            with self._lock:
220                callbacks = set(self._registrations[key])
221
222            in_flight.add_metric(key, len(callbacks))
223
224            metrics = self._metrics_class()
225            metrics_by_key[key] = metrics
226            for callback in callbacks:
227                callback(metrics)
228
229        yield in_flight
230
231        for name in self.sub_metrics:
232            gauge = GaugeMetricFamily(
233                "_".join([self.name, name]), "", labels=self.labels
234            )
235            for key, metrics in metrics_by_key.items():
236                gauge.add_metric(key, getattr(metrics, name))
237            yield gauge
238
239    def _register_with_collector(self) -> None:
240        if self.name in all_gauges.keys():
241            logger.warning("%s already registered, reregistering" % (self.name,))
242            REGISTRY.unregister(all_gauges.pop(self.name))
243
244        REGISTRY.register(self)
245        all_gauges[self.name] = self
246
247
248class GaugeBucketCollector:
249    """Like a Histogram, but the buckets are Gauges which are updated atomically.
250
251    The data is updated by calling `update_data` with an iterable of measurements.
252
253    We assume that the data is updated less frequently than it is reported to
254    Prometheus, and optimise for that case.
255    """
256
257    __slots__ = (
258        "_name",
259        "_documentation",
260        "_bucket_bounds",
261        "_metric",
262    )
263
264    def __init__(
265        self,
266        name: str,
267        documentation: str,
268        buckets: Iterable[float],
269        registry: CollectorRegistry = REGISTRY,
270    ):
271        """
272        Args:
273            name: base name of metric to be exported to Prometheus. (a _bucket suffix
274               will be added.)
275            documentation: help text for the metric
276            buckets: The top bounds of the buckets to report
277            registry: metric registry to register with
278        """
279        self._name = name
280        self._documentation = documentation
281
282        # the tops of the buckets
283        self._bucket_bounds = [float(b) for b in buckets]
284        if self._bucket_bounds != sorted(self._bucket_bounds):
285            raise ValueError("Buckets not in sorted order")
286
287        if self._bucket_bounds[-1] != float("inf"):
288            self._bucket_bounds.append(float("inf"))
289
290        # We initially set this to None. We won't report metrics until
291        # this has been initialised after a successful data update
292        self._metric: Optional[GaugeHistogramMetricFamily] = None
293
294        registry.register(self)
295
296    def collect(self) -> Iterable[Metric]:
297        # Don't report metrics unless we've already collected some data
298        if self._metric is not None:
299            yield self._metric
300
301    def update_data(self, values: Iterable[float]) -> None:
302        """Update the data to be reported by the metric
303
304        The existing data is cleared, and each measurement in the input is assigned
305        to the relevant bucket.
306        """
307        self._metric = self._values_to_metric(values)
308
309    def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
310        total = 0.0
311        bucket_values = [0 for _ in self._bucket_bounds]
312
313        for v in values:
314            # assign each value to a bucket
315            for i, bound in enumerate(self._bucket_bounds):
316                if v <= bound:
317                    bucket_values[i] += 1
318                    break
319
320            # ... and increment the sum
321            total += v
322
323        # now, aggregate the bucket values so that they count the number of entries in
324        # that bucket or below.
325        accumulated_values = itertools.accumulate(bucket_values)
326
327        return GaugeHistogramMetricFamily(
328            self._name,
329            self._documentation,
330            buckets=list(
331                zip((str(b) for b in self._bucket_bounds), accumulated_values)
332            ),
333            gsum_value=total,
334        )
335
336
337#
338# Detailed CPU metrics
339#
340
341
342class CPUMetrics:
343    def __init__(self) -> None:
344        ticks_per_sec = 100
345        try:
346            # Try and get the system config
347            ticks_per_sec = os.sysconf("SC_CLK_TCK")
348        except (ValueError, TypeError, AttributeError):
349            pass
350
351        self.ticks_per_sec = ticks_per_sec
352
353    def collect(self) -> Iterable[Metric]:
354        if not HAVE_PROC_SELF_STAT:
355            return
356
357        with open("/proc/self/stat") as s:
358            line = s.read()
359            raw_stats = line.split(") ", 1)[1].split(" ")
360
361            user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
362            user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
363            yield user
364
365            sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
366            sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
367            yield sys
368
369
370REGISTRY.register(CPUMetrics())
371
372#
373# Python GC metrics
374#
375
376gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
377gc_time = Histogram(
378    "python_gc_time",
379    "Time taken to GC (sec)",
380    ["gen"],
381    buckets=[
382        0.0025,
383        0.005,
384        0.01,
385        0.025,
386        0.05,
387        0.10,
388        0.25,
389        0.50,
390        1.00,
391        2.50,
392        5.00,
393        7.50,
394        15.00,
395        30.00,
396        45.00,
397        60.00,
398    ],
399)
400
401
402class GCCounts:
403    def collect(self) -> Iterable[Metric]:
404        cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
405        for n, m in enumerate(gc.get_count()):
406            cm.add_metric([str(n)], m)
407
408        yield cm
409
410
411if not running_on_pypy:
412    REGISTRY.register(GCCounts())
413
414
415#
416# PyPy GC / memory metrics
417#
418
419
420class PyPyGCStats:
421    def collect(self) -> Iterable[Metric]:
422
423        # @stats is a pretty-printer object with __str__() returning a nice table,
424        # plus some fields that contain data from that table.
425        # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
426        stats = gc.get_stats(memory_pressure=False)  # type: ignore
427        # @s contains same fields as @stats, but as actual integers.
428        s = stats._s  # type: ignore
429
430        # also note that field naming is completely braindead
431        # and only vaguely correlates with the pretty-printed table.
432        # >>>> gc.get_stats(False)
433        # Total memory consumed:
434        #     GC used:            8.7MB (peak: 39.0MB)        # s.total_gc_memory, s.peak_memory
435        #        in arenas:            3.0MB                  # s.total_arena_memory
436        #        rawmalloced:          1.7MB                  # s.total_rawmalloced_memory
437        #        nursery:              4.0MB                  # s.nursery_size
438        #     raw assembler used: 31.0kB                      # s.jit_backend_used
439        #     -----------------------------
440        #     Total:              8.8MB                       # stats.memory_used_sum
441        #
442        #     Total memory allocated:
443        #     GC allocated:            38.7MB (peak: 41.1MB)  # s.total_allocated_memory, s.peak_allocated_memory
444        #        in arenas:            30.9MB                 # s.peak_arena_memory
445        #        rawmalloced:          4.1MB                  # s.peak_rawmalloced_memory
446        #        nursery:              4.0MB                  # s.nursery_size
447        #     raw assembler allocated: 1.0MB                  # s.jit_backend_allocated
448        #     -----------------------------
449        #     Total:                   39.7MB                 # stats.memory_allocated_sum
450        #
451        #     Total time spent in GC:  0.073                  # s.total_gc_time
452
453        pypy_gc_time = CounterMetricFamily(
454            "pypy_gc_time_seconds_total",
455            "Total time spent in PyPy GC",
456            labels=[],
457        )
458        pypy_gc_time.add_metric([], s.total_gc_time / 1000)
459        yield pypy_gc_time
460
461        pypy_mem = GaugeMetricFamily(
462            "pypy_memory_bytes",
463            "Memory tracked by PyPy allocator",
464            labels=["state", "class", "kind"],
465        )
466        # memory used by JIT assembler
467        pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
468        pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
469        # memory used by GCed objects
470        pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
471        pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
472        pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
473        pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
474        pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
475        pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
476        # totals
477        pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
478        pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
479        pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
480        pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
481        yield pypy_mem
482
483
484if running_on_pypy:
485    REGISTRY.register(PyPyGCStats())
486
487
488#
489# Twisted reactor metrics
490#
491
492tick_time = Histogram(
493    "python_twisted_reactor_tick_time",
494    "Tick time of the Twisted reactor (sec)",
495    buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
496)
497pending_calls_metric = Histogram(
498    "python_twisted_reactor_pending_calls",
499    "Pending calls",
500    buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
501)
502
503#
504# Federation Metrics
505#
506
507sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
508
509events_processed_counter = Counter("synapse_federation_client_events_processed", "")
510
511event_processing_loop_counter = Counter(
512    "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
513)
514
515event_processing_loop_room_count = Counter(
516    "synapse_event_processing_loop_room_count",
517    "Rooms seen per event processing loop iteration",
518    ["name"],
519)
520
521
522# Used to track where various components have processed in the event stream,
523# e.g. federation sending, appservice sending, etc.
524event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
525
526# Used to track the current max events stream position
527event_persisted_position = Gauge("synapse_event_persisted_position", "")
528
529# Used to track the received_ts of the last event processed by various
530# components
531event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
532
533# Used to track the lag processing events. This is the time difference
534# between the last processed event's received_ts and the time it was
535# finished being processed.
536event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
537
538event_processing_lag_by_event = Histogram(
539    "synapse_event_processing_lag_by_event",
540    "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
541    ["name"],
542)
543
544# Build info of the running server.
545build_info = Gauge(
546    "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
547)
548build_info.labels(
549    " ".join([platform.python_implementation(), platform.python_version()]),
550    get_version_string(synapse),
551    " ".join([platform.system(), platform.release()]),
552).set(1)
553
554last_ticked = time.time()
555
556# 3PID send info
557threepid_send_requests = Histogram(
558    "synapse_threepid_send_requests_with_tries",
559    documentation="Number of requests for a 3pid token by try count. Note if"
560    " there is a request with try count of 4, then there would have been one"
561    " each for 1, 2 and 3",
562    buckets=(1, 2, 3, 4, 5, 10),
563    labelnames=("type", "reason"),
564)
565
566threadpool_total_threads = Gauge(
567    "synapse_threadpool_total_threads",
568    "Total number of threads currently in the threadpool",
569    ["name"],
570)
571
572threadpool_total_working_threads = Gauge(
573    "synapse_threadpool_working_threads",
574    "Number of threads currently working in the threadpool",
575    ["name"],
576)
577
578threadpool_total_min_threads = Gauge(
579    "synapse_threadpool_min_threads",
580    "Minimum number of threads configured in the threadpool",
581    ["name"],
582)
583
584threadpool_total_max_threads = Gauge(
585    "synapse_threadpool_max_threads",
586    "Maximum number of threads configured in the threadpool",
587    ["name"],
588)
589
590
591def register_threadpool(name: str, threadpool: ThreadPool) -> None:
592    """Add metrics for the threadpool."""
593
594    threadpool_total_min_threads.labels(name).set(threadpool.min)
595    threadpool_total_max_threads.labels(name).set(threadpool.max)
596
597    threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads))
598    threadpool_total_working_threads.labels(name).set_function(
599        lambda: len(threadpool.working)
600    )
601
602
603class ReactorLastSeenMetric:
604    def collect(self) -> Iterable[Metric]:
605        cm = GaugeMetricFamily(
606            "python_twisted_reactor_last_seen",
607            "Seconds since the Twisted reactor was last seen",
608        )
609        cm.add_metric([], time.time() - last_ticked)
610        yield cm
611
612
613REGISTRY.register(ReactorLastSeenMetric())
614
615# The minimum time in seconds between GCs for each generation, regardless of the current GC
616# thresholds and counts.
617MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
618
619# The time (in seconds since the epoch) of the last time we did a GC for each generation.
620_last_gc = [0.0, 0.0, 0.0]
621
622
623F = TypeVar("F", bound=Callable[..., Any])
624
625
626def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
627    @functools.wraps(func)
628    def f(*args: Any, **kwargs: Any) -> Any:
629        now = reactor.seconds()
630        num_pending = 0
631
632        # _newTimedCalls is one long list of *all* pending calls. Below loop
633        # is based off of impl of reactor.runUntilCurrent
634        for delayed_call in reactor._newTimedCalls:
635            if delayed_call.time > now:
636                break
637
638            if delayed_call.delayed_time > 0:
639                continue
640
641            num_pending += 1
642
643        num_pending += len(reactor.threadCallQueue)
644        start = time.time()
645        ret = func(*args, **kwargs)
646        end = time.time()
647
648        # record the amount of wallclock time spent running pending calls.
649        # This is a proxy for the actual amount of time between reactor polls,
650        # since about 25% of time is actually spent running things triggered by
651        # I/O events, but that is harder to capture without rewriting half the
652        # reactor.
653        tick_time.observe(end - start)
654        pending_calls_metric.observe(num_pending)
655
656        # Update the time we last ticked, for the metric to test whether
657        # Synapse's reactor has frozen
658        global last_ticked
659        last_ticked = end
660
661        if running_on_pypy:
662            return ret
663
664        # Check if we need to do a manual GC (since its been disabled), and do
665        # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
666        # promote an object into gen 2, and we don't want to handle the same
667        # object multiple times.
668        threshold = gc.get_threshold()
669        counts = gc.get_count()
670        for i in (2, 1, 0):
671            # We check if we need to do one based on a straightforward
672            # comparison between the threshold and count. We also do an extra
673            # check to make sure that we don't a GC too often.
674            if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
675                if i == 0:
676                    logger.debug("Collecting gc %d", i)
677                else:
678                    logger.info("Collecting gc %d", i)
679
680                start = time.time()
681                unreachable = gc.collect(i)
682                end = time.time()
683
684                _last_gc[i] = end
685
686                gc_time.labels(i).observe(end - start)
687                gc_unreachable.labels(i).set(unreachable)
688
689        return ret
690
691    return cast(F, f)
692
693
694try:
695    # Ensure the reactor has all the attributes we expect
696    reactor.seconds  # type: ignore
697    reactor.runUntilCurrent  # type: ignore
698    reactor._newTimedCalls  # type: ignore
699    reactor.threadCallQueue  # type: ignore
700
701    # runUntilCurrent is called when we have pending calls. It is called once
702    # per iteratation after fd polling.
703    reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent)  # type: ignore
704
705    # We manually run the GC each reactor tick so that we can get some metrics
706    # about time spent doing GC,
707    if not running_on_pypy:
708        gc.disable()
709except AttributeError:
710    pass
711
712
713__all__ = [
714    "MetricsResource",
715    "generate_latest",
716    "start_http_server",
717    "LaterGauge",
718    "InFlightGauge",
719    "GaugeBucketCollector",
720]
721