1# Copyright 2015, 2016 OpenMarket Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import functools 16import gc 17import itertools 18import logging 19import os 20import platform 21import threading 22import time 23from typing import ( 24 Any, 25 Callable, 26 Dict, 27 Generic, 28 Iterable, 29 Mapping, 30 Optional, 31 Sequence, 32 Set, 33 Tuple, 34 Type, 35 TypeVar, 36 Union, 37 cast, 38) 39 40import attr 41from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric 42from prometheus_client.core import ( 43 REGISTRY, 44 CounterMetricFamily, 45 GaugeHistogramMetricFamily, 46 GaugeMetricFamily, 47) 48 49from twisted.internet import reactor 50from twisted.internet.base import ReactorBase 51from twisted.python.threadpool import ThreadPool 52 53import synapse 54from synapse.metrics._exposition import ( 55 MetricsResource, 56 generate_latest, 57 start_http_server, 58) 59from synapse.util.versionstring import get_version_string 60 61logger = logging.getLogger(__name__) 62 63METRICS_PREFIX = "/_synapse/metrics" 64 65running_on_pypy = platform.python_implementation() == "PyPy" 66all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {} 67 68HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") 69 70 71class RegistryProxy: 72 @staticmethod 73 def collect() -> Iterable[Metric]: 74 for metric in REGISTRY.collect(): 75 if not metric.name.startswith("__"): 76 yield metric 77 78 79@attr.s(slots=True, hash=True) 80class LaterGauge: 81 82 name = attr.ib(type=str) 83 desc = attr.ib(type=str) 84 labels = attr.ib(hash=False, type=Optional[Iterable[str]]) 85 # callback: should either return a value (if there are no labels for this metric), 86 # or dict mapping from a label tuple to a value 87 caller = attr.ib( 88 type=Callable[ 89 [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] 90 ] 91 ) 92 93 def collect(self) -> Iterable[Metric]: 94 95 g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) 96 97 try: 98 calls = self.caller() 99 except Exception: 100 logger.exception("Exception running callback for LaterGauge(%s)", self.name) 101 yield g 102 return 103 104 if isinstance(calls, (int, float)): 105 g.add_metric([], calls) 106 else: 107 for k, v in calls.items(): 108 g.add_metric(k, v) 109 110 yield g 111 112 def __attrs_post_init__(self) -> None: 113 self._register() 114 115 def _register(self) -> None: 116 if self.name in all_gauges.keys(): 117 logger.warning("%s already registered, reregistering" % (self.name,)) 118 REGISTRY.unregister(all_gauges.pop(self.name)) 119 120 REGISTRY.register(self) 121 all_gauges[self.name] = self 122 123 124# `MetricsEntry` only makes sense when it is a `Protocol`, 125# but `Protocol` can't be used as a `TypeVar` bound. 126MetricsEntry = TypeVar("MetricsEntry") 127 128 129class InFlightGauge(Generic[MetricsEntry]): 130 """Tracks number of things (e.g. requests, Measure blocks, etc) in flight 131 at any given time. 132 133 Each InFlightGauge will create a metric called `<name>_total` that counts 134 the number of in flight blocks, as well as a metrics for each item in the 135 given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the 136 callbacks. 137 138 Args: 139 name 140 desc 141 labels 142 sub_metrics: A list of sub metrics that the callbacks will update. 143 """ 144 145 def __init__( 146 self, 147 name: str, 148 desc: str, 149 labels: Sequence[str], 150 sub_metrics: Sequence[str], 151 ): 152 self.name = name 153 self.desc = desc 154 self.labels = labels 155 self.sub_metrics = sub_metrics 156 157 # Create a class which have the sub_metrics values as attributes, which 158 # default to 0 on initialization. Used to pass to registered callbacks. 159 self._metrics_class: Type[MetricsEntry] = attr.make_class( 160 "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True 161 ) 162 163 # Counts number of in flight blocks for a given set of label values 164 self._registrations: Dict[ 165 Tuple[str, ...], Set[Callable[[MetricsEntry], None]] 166 ] = {} 167 168 # Protects access to _registrations 169 self._lock = threading.Lock() 170 171 self._register_with_collector() 172 173 def register( 174 self, 175 key: Tuple[str, ...], 176 callback: Callable[[MetricsEntry], None], 177 ) -> None: 178 """Registers that we've entered a new block with labels `key`. 179 180 `callback` gets called each time the metrics are collected. The same 181 value must also be given to `unregister`. 182 183 `callback` gets called with an object that has an attribute per 184 sub_metric, which should be updated with the necessary values. Note that 185 the metrics object is shared between all callbacks registered with the 186 same key. 187 188 Note that `callback` may be called on a separate thread. 189 """ 190 with self._lock: 191 self._registrations.setdefault(key, set()).add(callback) 192 193 def unregister( 194 self, 195 key: Tuple[str, ...], 196 callback: Callable[[MetricsEntry], None], 197 ) -> None: 198 """Registers that we've exited a block with labels `key`.""" 199 200 with self._lock: 201 self._registrations.setdefault(key, set()).discard(callback) 202 203 def collect(self) -> Iterable[Metric]: 204 """Called by prometheus client when it reads metrics. 205 206 Note: may be called by a separate thread. 207 """ 208 in_flight = GaugeMetricFamily( 209 self.name + "_total", self.desc, labels=self.labels 210 ) 211 212 metrics_by_key = {} 213 214 # We copy so that we don't mutate the list while iterating 215 with self._lock: 216 keys = list(self._registrations) 217 218 for key in keys: 219 with self._lock: 220 callbacks = set(self._registrations[key]) 221 222 in_flight.add_metric(key, len(callbacks)) 223 224 metrics = self._metrics_class() 225 metrics_by_key[key] = metrics 226 for callback in callbacks: 227 callback(metrics) 228 229 yield in_flight 230 231 for name in self.sub_metrics: 232 gauge = GaugeMetricFamily( 233 "_".join([self.name, name]), "", labels=self.labels 234 ) 235 for key, metrics in metrics_by_key.items(): 236 gauge.add_metric(key, getattr(metrics, name)) 237 yield gauge 238 239 def _register_with_collector(self) -> None: 240 if self.name in all_gauges.keys(): 241 logger.warning("%s already registered, reregistering" % (self.name,)) 242 REGISTRY.unregister(all_gauges.pop(self.name)) 243 244 REGISTRY.register(self) 245 all_gauges[self.name] = self 246 247 248class GaugeBucketCollector: 249 """Like a Histogram, but the buckets are Gauges which are updated atomically. 250 251 The data is updated by calling `update_data` with an iterable of measurements. 252 253 We assume that the data is updated less frequently than it is reported to 254 Prometheus, and optimise for that case. 255 """ 256 257 __slots__ = ( 258 "_name", 259 "_documentation", 260 "_bucket_bounds", 261 "_metric", 262 ) 263 264 def __init__( 265 self, 266 name: str, 267 documentation: str, 268 buckets: Iterable[float], 269 registry: CollectorRegistry = REGISTRY, 270 ): 271 """ 272 Args: 273 name: base name of metric to be exported to Prometheus. (a _bucket suffix 274 will be added.) 275 documentation: help text for the metric 276 buckets: The top bounds of the buckets to report 277 registry: metric registry to register with 278 """ 279 self._name = name 280 self._documentation = documentation 281 282 # the tops of the buckets 283 self._bucket_bounds = [float(b) for b in buckets] 284 if self._bucket_bounds != sorted(self._bucket_bounds): 285 raise ValueError("Buckets not in sorted order") 286 287 if self._bucket_bounds[-1] != float("inf"): 288 self._bucket_bounds.append(float("inf")) 289 290 # We initially set this to None. We won't report metrics until 291 # this has been initialised after a successful data update 292 self._metric: Optional[GaugeHistogramMetricFamily] = None 293 294 registry.register(self) 295 296 def collect(self) -> Iterable[Metric]: 297 # Don't report metrics unless we've already collected some data 298 if self._metric is not None: 299 yield self._metric 300 301 def update_data(self, values: Iterable[float]) -> None: 302 """Update the data to be reported by the metric 303 304 The existing data is cleared, and each measurement in the input is assigned 305 to the relevant bucket. 306 """ 307 self._metric = self._values_to_metric(values) 308 309 def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily: 310 total = 0.0 311 bucket_values = [0 for _ in self._bucket_bounds] 312 313 for v in values: 314 # assign each value to a bucket 315 for i, bound in enumerate(self._bucket_bounds): 316 if v <= bound: 317 bucket_values[i] += 1 318 break 319 320 # ... and increment the sum 321 total += v 322 323 # now, aggregate the bucket values so that they count the number of entries in 324 # that bucket or below. 325 accumulated_values = itertools.accumulate(bucket_values) 326 327 return GaugeHistogramMetricFamily( 328 self._name, 329 self._documentation, 330 buckets=list( 331 zip((str(b) for b in self._bucket_bounds), accumulated_values) 332 ), 333 gsum_value=total, 334 ) 335 336 337# 338# Detailed CPU metrics 339# 340 341 342class CPUMetrics: 343 def __init__(self) -> None: 344 ticks_per_sec = 100 345 try: 346 # Try and get the system config 347 ticks_per_sec = os.sysconf("SC_CLK_TCK") 348 except (ValueError, TypeError, AttributeError): 349 pass 350 351 self.ticks_per_sec = ticks_per_sec 352 353 def collect(self) -> Iterable[Metric]: 354 if not HAVE_PROC_SELF_STAT: 355 return 356 357 with open("/proc/self/stat") as s: 358 line = s.read() 359 raw_stats = line.split(") ", 1)[1].split(" ") 360 361 user = GaugeMetricFamily("process_cpu_user_seconds_total", "") 362 user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) 363 yield user 364 365 sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") 366 sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) 367 yield sys 368 369 370REGISTRY.register(CPUMetrics()) 371 372# 373# Python GC metrics 374# 375 376gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) 377gc_time = Histogram( 378 "python_gc_time", 379 "Time taken to GC (sec)", 380 ["gen"], 381 buckets=[ 382 0.0025, 383 0.005, 384 0.01, 385 0.025, 386 0.05, 387 0.10, 388 0.25, 389 0.50, 390 1.00, 391 2.50, 392 5.00, 393 7.50, 394 15.00, 395 30.00, 396 45.00, 397 60.00, 398 ], 399) 400 401 402class GCCounts: 403 def collect(self) -> Iterable[Metric]: 404 cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) 405 for n, m in enumerate(gc.get_count()): 406 cm.add_metric([str(n)], m) 407 408 yield cm 409 410 411if not running_on_pypy: 412 REGISTRY.register(GCCounts()) 413 414 415# 416# PyPy GC / memory metrics 417# 418 419 420class PyPyGCStats: 421 def collect(self) -> Iterable[Metric]: 422 423 # @stats is a pretty-printer object with __str__() returning a nice table, 424 # plus some fields that contain data from that table. 425 # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). 426 stats = gc.get_stats(memory_pressure=False) # type: ignore 427 # @s contains same fields as @stats, but as actual integers. 428 s = stats._s # type: ignore 429 430 # also note that field naming is completely braindead 431 # and only vaguely correlates with the pretty-printed table. 432 # >>>> gc.get_stats(False) 433 # Total memory consumed: 434 # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory 435 # in arenas: 3.0MB # s.total_arena_memory 436 # rawmalloced: 1.7MB # s.total_rawmalloced_memory 437 # nursery: 4.0MB # s.nursery_size 438 # raw assembler used: 31.0kB # s.jit_backend_used 439 # ----------------------------- 440 # Total: 8.8MB # stats.memory_used_sum 441 # 442 # Total memory allocated: 443 # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory 444 # in arenas: 30.9MB # s.peak_arena_memory 445 # rawmalloced: 4.1MB # s.peak_rawmalloced_memory 446 # nursery: 4.0MB # s.nursery_size 447 # raw assembler allocated: 1.0MB # s.jit_backend_allocated 448 # ----------------------------- 449 # Total: 39.7MB # stats.memory_allocated_sum 450 # 451 # Total time spent in GC: 0.073 # s.total_gc_time 452 453 pypy_gc_time = CounterMetricFamily( 454 "pypy_gc_time_seconds_total", 455 "Total time spent in PyPy GC", 456 labels=[], 457 ) 458 pypy_gc_time.add_metric([], s.total_gc_time / 1000) 459 yield pypy_gc_time 460 461 pypy_mem = GaugeMetricFamily( 462 "pypy_memory_bytes", 463 "Memory tracked by PyPy allocator", 464 labels=["state", "class", "kind"], 465 ) 466 # memory used by JIT assembler 467 pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) 468 pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) 469 # memory used by GCed objects 470 pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) 471 pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) 472 pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) 473 pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) 474 pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) 475 pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) 476 # totals 477 pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) 478 pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) 479 pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) 480 pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) 481 yield pypy_mem 482 483 484if running_on_pypy: 485 REGISTRY.register(PyPyGCStats()) 486 487 488# 489# Twisted reactor metrics 490# 491 492tick_time = Histogram( 493 "python_twisted_reactor_tick_time", 494 "Tick time of the Twisted reactor (sec)", 495 buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], 496) 497pending_calls_metric = Histogram( 498 "python_twisted_reactor_pending_calls", 499 "Pending calls", 500 buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], 501) 502 503# 504# Federation Metrics 505# 506 507sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") 508 509events_processed_counter = Counter("synapse_federation_client_events_processed", "") 510 511event_processing_loop_counter = Counter( 512 "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"] 513) 514 515event_processing_loop_room_count = Counter( 516 "synapse_event_processing_loop_room_count", 517 "Rooms seen per event processing loop iteration", 518 ["name"], 519) 520 521 522# Used to track where various components have processed in the event stream, 523# e.g. federation sending, appservice sending, etc. 524event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) 525 526# Used to track the current max events stream position 527event_persisted_position = Gauge("synapse_event_persisted_position", "") 528 529# Used to track the received_ts of the last event processed by various 530# components 531event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) 532 533# Used to track the lag processing events. This is the time difference 534# between the last processed event's received_ts and the time it was 535# finished being processed. 536event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) 537 538event_processing_lag_by_event = Histogram( 539 "synapse_event_processing_lag_by_event", 540 "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", 541 ["name"], 542) 543 544# Build info of the running server. 545build_info = Gauge( 546 "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] 547) 548build_info.labels( 549 " ".join([platform.python_implementation(), platform.python_version()]), 550 get_version_string(synapse), 551 " ".join([platform.system(), platform.release()]), 552).set(1) 553 554last_ticked = time.time() 555 556# 3PID send info 557threepid_send_requests = Histogram( 558 "synapse_threepid_send_requests_with_tries", 559 documentation="Number of requests for a 3pid token by try count. Note if" 560 " there is a request with try count of 4, then there would have been one" 561 " each for 1, 2 and 3", 562 buckets=(1, 2, 3, 4, 5, 10), 563 labelnames=("type", "reason"), 564) 565 566threadpool_total_threads = Gauge( 567 "synapse_threadpool_total_threads", 568 "Total number of threads currently in the threadpool", 569 ["name"], 570) 571 572threadpool_total_working_threads = Gauge( 573 "synapse_threadpool_working_threads", 574 "Number of threads currently working in the threadpool", 575 ["name"], 576) 577 578threadpool_total_min_threads = Gauge( 579 "synapse_threadpool_min_threads", 580 "Minimum number of threads configured in the threadpool", 581 ["name"], 582) 583 584threadpool_total_max_threads = Gauge( 585 "synapse_threadpool_max_threads", 586 "Maximum number of threads configured in the threadpool", 587 ["name"], 588) 589 590 591def register_threadpool(name: str, threadpool: ThreadPool) -> None: 592 """Add metrics for the threadpool.""" 593 594 threadpool_total_min_threads.labels(name).set(threadpool.min) 595 threadpool_total_max_threads.labels(name).set(threadpool.max) 596 597 threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads)) 598 threadpool_total_working_threads.labels(name).set_function( 599 lambda: len(threadpool.working) 600 ) 601 602 603class ReactorLastSeenMetric: 604 def collect(self) -> Iterable[Metric]: 605 cm = GaugeMetricFamily( 606 "python_twisted_reactor_last_seen", 607 "Seconds since the Twisted reactor was last seen", 608 ) 609 cm.add_metric([], time.time() - last_ticked) 610 yield cm 611 612 613REGISTRY.register(ReactorLastSeenMetric()) 614 615# The minimum time in seconds between GCs for each generation, regardless of the current GC 616# thresholds and counts. 617MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) 618 619# The time (in seconds since the epoch) of the last time we did a GC for each generation. 620_last_gc = [0.0, 0.0, 0.0] 621 622 623F = TypeVar("F", bound=Callable[..., Any]) 624 625 626def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: 627 @functools.wraps(func) 628 def f(*args: Any, **kwargs: Any) -> Any: 629 now = reactor.seconds() 630 num_pending = 0 631 632 # _newTimedCalls is one long list of *all* pending calls. Below loop 633 # is based off of impl of reactor.runUntilCurrent 634 for delayed_call in reactor._newTimedCalls: 635 if delayed_call.time > now: 636 break 637 638 if delayed_call.delayed_time > 0: 639 continue 640 641 num_pending += 1 642 643 num_pending += len(reactor.threadCallQueue) 644 start = time.time() 645 ret = func(*args, **kwargs) 646 end = time.time() 647 648 # record the amount of wallclock time spent running pending calls. 649 # This is a proxy for the actual amount of time between reactor polls, 650 # since about 25% of time is actually spent running things triggered by 651 # I/O events, but that is harder to capture without rewriting half the 652 # reactor. 653 tick_time.observe(end - start) 654 pending_calls_metric.observe(num_pending) 655 656 # Update the time we last ticked, for the metric to test whether 657 # Synapse's reactor has frozen 658 global last_ticked 659 last_ticked = end 660 661 if running_on_pypy: 662 return ret 663 664 # Check if we need to do a manual GC (since its been disabled), and do 665 # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may 666 # promote an object into gen 2, and we don't want to handle the same 667 # object multiple times. 668 threshold = gc.get_threshold() 669 counts = gc.get_count() 670 for i in (2, 1, 0): 671 # We check if we need to do one based on a straightforward 672 # comparison between the threshold and count. We also do an extra 673 # check to make sure that we don't a GC too often. 674 if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]: 675 if i == 0: 676 logger.debug("Collecting gc %d", i) 677 else: 678 logger.info("Collecting gc %d", i) 679 680 start = time.time() 681 unreachable = gc.collect(i) 682 end = time.time() 683 684 _last_gc[i] = end 685 686 gc_time.labels(i).observe(end - start) 687 gc_unreachable.labels(i).set(unreachable) 688 689 return ret 690 691 return cast(F, f) 692 693 694try: 695 # Ensure the reactor has all the attributes we expect 696 reactor.seconds # type: ignore 697 reactor.runUntilCurrent # type: ignore 698 reactor._newTimedCalls # type: ignore 699 reactor.threadCallQueue # type: ignore 700 701 # runUntilCurrent is called when we have pending calls. It is called once 702 # per iteratation after fd polling. 703 reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore 704 705 # We manually run the GC each reactor tick so that we can get some metrics 706 # about time spent doing GC, 707 if not running_on_pypy: 708 gc.disable() 709except AttributeError: 710 pass 711 712 713__all__ = [ 714 "MetricsResource", 715 "generate_latest", 716 "start_http_server", 717 "LaterGauge", 718 "InFlightGauge", 719 "GaugeBucketCollector", 720] 721