1# Copyright 2018 New Vector Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17from functools import wraps 18from types import TracebackType 19from typing import ( 20 TYPE_CHECKING, 21 Any, 22 Awaitable, 23 Callable, 24 Dict, 25 Iterable, 26 Optional, 27 Set, 28 Type, 29 TypeVar, 30 Union, 31 cast, 32) 33 34from prometheus_client import Metric 35from prometheus_client.core import REGISTRY, Counter, Gauge 36 37from twisted.internet import defer 38 39from synapse.logging.context import ( 40 ContextResourceUsage, 41 LoggingContext, 42 PreserveLoggingContext, 43) 44from synapse.logging.opentracing import ( 45 SynapseTags, 46 noop_context_manager, 47 start_active_span, 48) 49 50if TYPE_CHECKING: 51 import resource 52 53 54logger = logging.getLogger(__name__) 55 56 57_background_process_start_count = Counter( 58 "synapse_background_process_start_count", 59 "Number of background processes started", 60 ["name"], 61) 62 63_background_process_in_flight_count = Gauge( 64 "synapse_background_process_in_flight_count", 65 "Number of background processes in flight", 66 labelnames=["name"], 67) 68 69# we set registry=None in all of these to stop them getting registered with 70# the default registry. Instead we collect them all via the CustomCollector, 71# which ensures that we can update them before they are collected. 72# 73_background_process_ru_utime = Counter( 74 "synapse_background_process_ru_utime_seconds", 75 "User CPU time used by background processes, in seconds", 76 ["name"], 77 registry=None, 78) 79 80_background_process_ru_stime = Counter( 81 "synapse_background_process_ru_stime_seconds", 82 "System CPU time used by background processes, in seconds", 83 ["name"], 84 registry=None, 85) 86 87_background_process_db_txn_count = Counter( 88 "synapse_background_process_db_txn_count", 89 "Number of database transactions done by background processes", 90 ["name"], 91 registry=None, 92) 93 94_background_process_db_txn_duration = Counter( 95 "synapse_background_process_db_txn_duration_seconds", 96 ( 97 "Seconds spent by background processes waiting for database " 98 "transactions, excluding scheduling time" 99 ), 100 ["name"], 101 registry=None, 102) 103 104_background_process_db_sched_duration = Counter( 105 "synapse_background_process_db_sched_duration_seconds", 106 "Seconds spent by background processes waiting for database connections", 107 ["name"], 108 registry=None, 109) 110 111# map from description to a counter, so that we can name our logcontexts 112# incrementally. (It actually duplicates _background_process_start_count, but 113# it's much simpler to do so than to try to combine them.) 114_background_process_counts: Dict[str, int] = {} 115 116# Set of all running background processes that became active active since the 117# last time metrics were scraped (i.e. background processes that performed some 118# work since the last scrape.) 119# 120# We do it like this to handle the case where we have a large number of 121# background processes stacking up behind a lock or linearizer, where we then 122# only need to iterate over and update metrics for the process that have 123# actually been active and can ignore the idle ones. 124_background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set() 125 126# A lock that covers the above set and dict 127_bg_metrics_lock = threading.Lock() 128 129 130class _Collector: 131 """A custom metrics collector for the background process metrics. 132 133 Ensures that all of the metrics are up-to-date with any in-flight processes 134 before they are returned. 135 """ 136 137 def collect(self) -> Iterable[Metric]: 138 global _background_processes_active_since_last_scrape 139 140 # We swap out the _background_processes set with an empty one so that 141 # we can safely iterate over the set without holding the lock. 142 with _bg_metrics_lock: 143 _background_processes_copy = _background_processes_active_since_last_scrape 144 _background_processes_active_since_last_scrape = set() 145 146 for process in _background_processes_copy: 147 process.update_metrics() 148 149 # now we need to run collect() over each of the static Counters, and 150 # yield each metric they return. 151 for m in ( 152 _background_process_ru_utime, 153 _background_process_ru_stime, 154 _background_process_db_txn_count, 155 _background_process_db_txn_duration, 156 _background_process_db_sched_duration, 157 ): 158 yield from m.collect() 159 160 161REGISTRY.register(_Collector()) 162 163 164class _BackgroundProcess: 165 def __init__(self, desc: str, ctx: LoggingContext): 166 self.desc = desc 167 self._context = ctx 168 self._reported_stats: Optional[ContextResourceUsage] = None 169 170 def update_metrics(self) -> None: 171 """Updates the metrics with values from this process.""" 172 new_stats = self._context.get_resource_usage() 173 if self._reported_stats is None: 174 diff = new_stats 175 else: 176 diff = new_stats - self._reported_stats 177 self._reported_stats = new_stats 178 179 _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime) 180 _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime) 181 _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count) 182 _background_process_db_txn_duration.labels(self.desc).inc( 183 diff.db_txn_duration_sec 184 ) 185 _background_process_db_sched_duration.labels(self.desc).inc( 186 diff.db_sched_duration_sec 187 ) 188 189 190R = TypeVar("R") 191 192 193def run_as_background_process( 194 desc: str, 195 func: Callable[..., Awaitable[Optional[R]]], 196 *args: Any, 197 bg_start_span: bool = True, 198 **kwargs: Any, 199) -> "defer.Deferred[Optional[R]]": 200 """Run the given function in its own logcontext, with resource metrics 201 202 This should be used to wrap processes which are fired off to run in the 203 background, instead of being associated with a particular request. 204 205 It returns a Deferred which completes when the function completes, but it doesn't 206 follow the synapse logcontext rules, which makes it appropriate for passing to 207 clock.looping_call and friends (or for firing-and-forgetting in the middle of a 208 normal synapse async function). 209 210 Args: 211 desc: a description for this background process type 212 func: a function, which may return a Deferred or a coroutine 213 bg_start_span: Whether to start an opentracing span. Defaults to True. 214 Should only be disabled for processes that will not log to or tag 215 a span. 216 args: positional args for func 217 kwargs: keyword args for func 218 219 Returns: 220 Deferred which returns the result of func, or `None` if func raises. 221 Note that the returned Deferred does not follow the synapse logcontext 222 rules. 223 """ 224 225 async def run() -> Optional[R]: 226 with _bg_metrics_lock: 227 count = _background_process_counts.get(desc, 0) 228 _background_process_counts[desc] = count + 1 229 230 _background_process_start_count.labels(desc).inc() 231 _background_process_in_flight_count.labels(desc).inc() 232 233 with BackgroundProcessLoggingContext(desc, count) as context: 234 try: 235 if bg_start_span: 236 ctx = start_active_span( 237 f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)} 238 ) 239 else: 240 ctx = noop_context_manager() 241 with ctx: 242 return await func(*args, **kwargs) 243 except Exception: 244 logger.exception( 245 "Background process '%s' threw an exception", 246 desc, 247 ) 248 return None 249 finally: 250 _background_process_in_flight_count.labels(desc).dec() 251 252 with PreserveLoggingContext(): 253 # Note that we return a Deferred here so that it can be used in a 254 # looping_call and other places that expect a Deferred. 255 return defer.ensureDeferred(run()) 256 257 258F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]]) 259 260 261def wrap_as_background_process(desc: str) -> Callable[[F], F]: 262 """Decorator that wraps a function that gets called as a background 263 process. 264 265 Equivalent to calling the function with `run_as_background_process` 266 """ 267 268 def wrap_as_background_process_inner(func: F) -> F: 269 @wraps(func) 270 def wrap_as_background_process_inner_2( 271 *args: Any, **kwargs: Any 272 ) -> "defer.Deferred[Optional[R]]": 273 return run_as_background_process(desc, func, *args, **kwargs) 274 275 return cast(F, wrap_as_background_process_inner_2) 276 277 return wrap_as_background_process_inner 278 279 280class BackgroundProcessLoggingContext(LoggingContext): 281 """A logging context that tracks in flight metrics for background 282 processes. 283 """ 284 285 __slots__ = ["_proc"] 286 287 def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None): 288 """ 289 290 Args: 291 name: The name of the background process. Each distinct `name` gets a 292 separate prometheus time series. 293 294 instance_id: an identifer to add to `name` to distinguish this instance of 295 the named background process in the logs. If this is `None`, one is 296 made up based on id(self). 297 """ 298 if instance_id is None: 299 instance_id = id(self) 300 super().__init__("%s-%s" % (name, instance_id)) 301 self._proc = _BackgroundProcess(name, self) 302 303 def start(self, rusage: "Optional[resource.struct_rusage]") -> None: 304 """Log context has started running (again).""" 305 306 super().start(rusage) 307 308 # We've become active again so we make sure we're in the list of active 309 # procs. (Note that "start" here means we've become active, as opposed 310 # to starting for the first time.) 311 with _bg_metrics_lock: 312 _background_processes_active_since_last_scrape.add(self._proc) 313 314 def __exit__( 315 self, 316 type: Optional[Type[BaseException]], 317 value: Optional[BaseException], 318 traceback: Optional[TracebackType], 319 ) -> None: 320 """Log context has finished.""" 321 322 super().__exit__(type, value, traceback) 323 324 # The background process has finished. We explicitly remove and manually 325 # update the metrics here so that if nothing is scraping metrics the set 326 # doesn't infinitely grow. 327 with _bg_metrics_lock: 328 _background_processes_active_since_last_scrape.discard(self._proc) 329 330 self._proc.update_metrics() 331