1# Copyright 2018 New Vector Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import threading
17from functools import wraps
18from types import TracebackType
19from typing import (
20    TYPE_CHECKING,
21    Any,
22    Awaitable,
23    Callable,
24    Dict,
25    Iterable,
26    Optional,
27    Set,
28    Type,
29    TypeVar,
30    Union,
31    cast,
32)
33
34from prometheus_client import Metric
35from prometheus_client.core import REGISTRY, Counter, Gauge
36
37from twisted.internet import defer
38
39from synapse.logging.context import (
40    ContextResourceUsage,
41    LoggingContext,
42    PreserveLoggingContext,
43)
44from synapse.logging.opentracing import (
45    SynapseTags,
46    noop_context_manager,
47    start_active_span,
48)
49
50if TYPE_CHECKING:
51    import resource
52
53
54logger = logging.getLogger(__name__)
55
56
57_background_process_start_count = Counter(
58    "synapse_background_process_start_count",
59    "Number of background processes started",
60    ["name"],
61)
62
63_background_process_in_flight_count = Gauge(
64    "synapse_background_process_in_flight_count",
65    "Number of background processes in flight",
66    labelnames=["name"],
67)
68
69# we set registry=None in all of these to stop them getting registered with
70# the default registry. Instead we collect them all via the CustomCollector,
71# which ensures that we can update them before they are collected.
72#
73_background_process_ru_utime = Counter(
74    "synapse_background_process_ru_utime_seconds",
75    "User CPU time used by background processes, in seconds",
76    ["name"],
77    registry=None,
78)
79
80_background_process_ru_stime = Counter(
81    "synapse_background_process_ru_stime_seconds",
82    "System CPU time used by background processes, in seconds",
83    ["name"],
84    registry=None,
85)
86
87_background_process_db_txn_count = Counter(
88    "synapse_background_process_db_txn_count",
89    "Number of database transactions done by background processes",
90    ["name"],
91    registry=None,
92)
93
94_background_process_db_txn_duration = Counter(
95    "synapse_background_process_db_txn_duration_seconds",
96    (
97        "Seconds spent by background processes waiting for database "
98        "transactions, excluding scheduling time"
99    ),
100    ["name"],
101    registry=None,
102)
103
104_background_process_db_sched_duration = Counter(
105    "synapse_background_process_db_sched_duration_seconds",
106    "Seconds spent by background processes waiting for database connections",
107    ["name"],
108    registry=None,
109)
110
111# map from description to a counter, so that we can name our logcontexts
112# incrementally. (It actually duplicates _background_process_start_count, but
113# it's much simpler to do so than to try to combine them.)
114_background_process_counts: Dict[str, int] = {}
115
116# Set of all running background processes that became active active since the
117# last time metrics were scraped (i.e. background processes that performed some
118# work since the last scrape.)
119#
120# We do it like this to handle the case where we have a large number of
121# background processes stacking up behind a lock or linearizer, where we then
122# only need to iterate over and update metrics for the process that have
123# actually been active and can ignore the idle ones.
124_background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set()
125
126# A lock that covers the above set and dict
127_bg_metrics_lock = threading.Lock()
128
129
130class _Collector:
131    """A custom metrics collector for the background process metrics.
132
133    Ensures that all of the metrics are up-to-date with any in-flight processes
134    before they are returned.
135    """
136
137    def collect(self) -> Iterable[Metric]:
138        global _background_processes_active_since_last_scrape
139
140        # We swap out the _background_processes set with an empty one so that
141        # we can safely iterate over the set without holding the lock.
142        with _bg_metrics_lock:
143            _background_processes_copy = _background_processes_active_since_last_scrape
144            _background_processes_active_since_last_scrape = set()
145
146        for process in _background_processes_copy:
147            process.update_metrics()
148
149        # now we need to run collect() over each of the static Counters, and
150        # yield each metric they return.
151        for m in (
152            _background_process_ru_utime,
153            _background_process_ru_stime,
154            _background_process_db_txn_count,
155            _background_process_db_txn_duration,
156            _background_process_db_sched_duration,
157        ):
158            yield from m.collect()
159
160
161REGISTRY.register(_Collector())
162
163
164class _BackgroundProcess:
165    def __init__(self, desc: str, ctx: LoggingContext):
166        self.desc = desc
167        self._context = ctx
168        self._reported_stats: Optional[ContextResourceUsage] = None
169
170    def update_metrics(self) -> None:
171        """Updates the metrics with values from this process."""
172        new_stats = self._context.get_resource_usage()
173        if self._reported_stats is None:
174            diff = new_stats
175        else:
176            diff = new_stats - self._reported_stats
177        self._reported_stats = new_stats
178
179        _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
180        _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
181        _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
182        _background_process_db_txn_duration.labels(self.desc).inc(
183            diff.db_txn_duration_sec
184        )
185        _background_process_db_sched_duration.labels(self.desc).inc(
186            diff.db_sched_duration_sec
187        )
188
189
190R = TypeVar("R")
191
192
193def run_as_background_process(
194    desc: str,
195    func: Callable[..., Awaitable[Optional[R]]],
196    *args: Any,
197    bg_start_span: bool = True,
198    **kwargs: Any,
199) -> "defer.Deferred[Optional[R]]":
200    """Run the given function in its own logcontext, with resource metrics
201
202    This should be used to wrap processes which are fired off to run in the
203    background, instead of being associated with a particular request.
204
205    It returns a Deferred which completes when the function completes, but it doesn't
206    follow the synapse logcontext rules, which makes it appropriate for passing to
207    clock.looping_call and friends (or for firing-and-forgetting in the middle of a
208    normal synapse async function).
209
210    Args:
211        desc: a description for this background process type
212        func: a function, which may return a Deferred or a coroutine
213        bg_start_span: Whether to start an opentracing span. Defaults to True.
214            Should only be disabled for processes that will not log to or tag
215            a span.
216        args: positional args for func
217        kwargs: keyword args for func
218
219    Returns:
220        Deferred which returns the result of func, or `None` if func raises.
221        Note that the returned Deferred does not follow the synapse logcontext
222        rules.
223    """
224
225    async def run() -> Optional[R]:
226        with _bg_metrics_lock:
227            count = _background_process_counts.get(desc, 0)
228            _background_process_counts[desc] = count + 1
229
230        _background_process_start_count.labels(desc).inc()
231        _background_process_in_flight_count.labels(desc).inc()
232
233        with BackgroundProcessLoggingContext(desc, count) as context:
234            try:
235                if bg_start_span:
236                    ctx = start_active_span(
237                        f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
238                    )
239                else:
240                    ctx = noop_context_manager()
241                with ctx:
242                    return await func(*args, **kwargs)
243            except Exception:
244                logger.exception(
245                    "Background process '%s' threw an exception",
246                    desc,
247                )
248                return None
249            finally:
250                _background_process_in_flight_count.labels(desc).dec()
251
252    with PreserveLoggingContext():
253        # Note that we return a Deferred here so that it can be used in a
254        # looping_call and other places that expect a Deferred.
255        return defer.ensureDeferred(run())
256
257
258F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]])
259
260
261def wrap_as_background_process(desc: str) -> Callable[[F], F]:
262    """Decorator that wraps a function that gets called as a background
263    process.
264
265    Equivalent to calling the function with `run_as_background_process`
266    """
267
268    def wrap_as_background_process_inner(func: F) -> F:
269        @wraps(func)
270        def wrap_as_background_process_inner_2(
271            *args: Any, **kwargs: Any
272        ) -> "defer.Deferred[Optional[R]]":
273            return run_as_background_process(desc, func, *args, **kwargs)
274
275        return cast(F, wrap_as_background_process_inner_2)
276
277    return wrap_as_background_process_inner
278
279
280class BackgroundProcessLoggingContext(LoggingContext):
281    """A logging context that tracks in flight metrics for background
282    processes.
283    """
284
285    __slots__ = ["_proc"]
286
287    def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None):
288        """
289
290        Args:
291            name: The name of the background process. Each distinct `name` gets a
292                separate prometheus time series.
293
294            instance_id: an identifer to add to `name` to distinguish this instance of
295                the named background process in the logs. If this is `None`, one is
296                made up based on id(self).
297        """
298        if instance_id is None:
299            instance_id = id(self)
300        super().__init__("%s-%s" % (name, instance_id))
301        self._proc = _BackgroundProcess(name, self)
302
303    def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
304        """Log context has started running (again)."""
305
306        super().start(rusage)
307
308        # We've become active again so we make sure we're in the list of active
309        # procs. (Note that "start" here means we've become active, as opposed
310        # to starting for the first time.)
311        with _bg_metrics_lock:
312            _background_processes_active_since_last_scrape.add(self._proc)
313
314    def __exit__(
315        self,
316        type: Optional[Type[BaseException]],
317        value: Optional[BaseException],
318        traceback: Optional[TracebackType],
319    ) -> None:
320        """Log context has finished."""
321
322        super().__exit__(type, value, traceback)
323
324        # The background process has finished. We explicitly remove and manually
325        # update the metrics here so that if nothing is scraping metrics the set
326        # doesn't infinitely grow.
327        with _bg_metrics_lock:
328            _background_processes_active_since_last_scrape.discard(self._proc)
329
330        self._proc.update_metrics()
331