1# Copyright 2017 New Vector Ltd
2# Copyright 2019-2021 The Matrix.org Foundation C.I.C
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import atexit
16import gc
17import logging
18import os
19import platform
20import signal
21import socket
22import sys
23import traceback
24import warnings
25from typing import (
26    TYPE_CHECKING,
27    Any,
28    Awaitable,
29    Callable,
30    Collection,
31    Dict,
32    Iterable,
33    List,
34    NoReturn,
35    Optional,
36    Tuple,
37    cast,
38)
39
40from cryptography.utils import CryptographyDeprecationWarning
41
42import twisted
43from twisted.internet import defer, error, reactor as _reactor
44from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP
45from twisted.internet.protocol import ServerFactory
46from twisted.internet.tcp import Port
47from twisted.logger import LoggingFile, LogLevel
48from twisted.protocols.tls import TLSMemoryBIOFactory
49from twisted.python.threadpool import ThreadPool
50
51import synapse
52from synapse.api.constants import MAX_PDU_SIZE
53from synapse.app import check_bind_error
54from synapse.app.phone_stats_home import start_phone_stats_home
55from synapse.config.homeserver import HomeServerConfig
56from synapse.config.server import ManholeConfig
57from synapse.crypto import context_factory
58from synapse.events.presence_router import load_legacy_presence_router
59from synapse.events.spamcheck import load_legacy_spam_checkers
60from synapse.events.third_party_rules import load_legacy_third_party_event_rules
61from synapse.handlers.auth import load_legacy_password_auth_providers
62from synapse.logging.context import PreserveLoggingContext
63from synapse.metrics import register_threadpool
64from synapse.metrics.background_process_metrics import wrap_as_background_process
65from synapse.metrics.jemalloc import setup_jemalloc_stats
66from synapse.types import ISynapseReactor
67from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
68from synapse.util.daemonize import daemonize_process
69from synapse.util.gai_resolver import GAIResolver
70from synapse.util.rlimit import change_resource_limit
71from synapse.util.versionstring import get_version_string
72
73if TYPE_CHECKING:
74    from synapse.server import HomeServer
75
76# Twisted injects the global reactor to make it easier to import, this confuses
77# mypy which thinks it is a module. Tell it that it a more proper type.
78reactor = cast(ISynapseReactor, _reactor)
79
80
81logger = logging.getLogger(__name__)
82
83# list of tuples of function, args list, kwargs dict
84_sighup_callbacks: List[
85    Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
86] = []
87
88
89def register_sighup(func: Callable[..., None], *args: Any, **kwargs: Any) -> None:
90    """
91    Register a function to be called when a SIGHUP occurs.
92
93    Args:
94        func: Function to be called when sent a SIGHUP signal.
95        *args, **kwargs: args and kwargs to be passed to the target function.
96    """
97    _sighup_callbacks.append((func, args, kwargs))
98
99
100def start_worker_reactor(
101    appname: str,
102    config: HomeServerConfig,
103    run_command: Callable[[], None] = reactor.run,
104) -> None:
105    """Run the reactor in the main process
106
107    Daemonizes if necessary, and then configures some resources, before starting
108    the reactor. Pulls configuration from the 'worker' settings in 'config'.
109
110    Args:
111        appname: application name which will be sent to syslog
112        config: config object
113        run_command: callable that actually runs the reactor
114    """
115
116    logger = logging.getLogger(config.worker.worker_app)
117
118    start_reactor(
119        appname,
120        soft_file_limit=config.server.soft_file_limit,
121        gc_thresholds=config.server.gc_thresholds,
122        pid_file=config.worker.worker_pid_file,
123        daemonize=config.worker.worker_daemonize,
124        print_pidfile=config.server.print_pidfile,
125        logger=logger,
126        run_command=run_command,
127    )
128
129
130def start_reactor(
131    appname: str,
132    soft_file_limit: int,
133    gc_thresholds: Optional[Tuple[int, int, int]],
134    pid_file: str,
135    daemonize: bool,
136    print_pidfile: bool,
137    logger: logging.Logger,
138    run_command: Callable[[], None] = reactor.run,
139) -> None:
140    """Run the reactor in the main process
141
142    Daemonizes if necessary, and then configures some resources, before starting
143    the reactor
144
145    Args:
146        appname: application name which will be sent to syslog
147        soft_file_limit:
148        gc_thresholds:
149        pid_file: name of pid file to write to if daemonize is True
150        daemonize: true to run the reactor in a background process
151        print_pidfile: whether to print the pid file, if daemonize is True
152        logger: logger instance to pass to Daemonize
153        run_command: callable that actually runs the reactor
154    """
155
156    def run() -> None:
157        logger.info("Running")
158        setup_jemalloc_stats()
159        change_resource_limit(soft_file_limit)
160        if gc_thresholds:
161            gc.set_threshold(*gc_thresholds)
162        run_command()
163
164    # make sure that we run the reactor with the sentinel log context,
165    # otherwise other PreserveLoggingContext instances will get confused
166    # and complain when they see the logcontext arbitrarily swapping
167    # between the sentinel and `run` logcontexts.
168    #
169    # We also need to drop the logcontext before forking if we're daemonizing,
170    # otherwise the cputime metrics get confused about the per-thread resource usage
171    # appearing to go backwards.
172    with PreserveLoggingContext():
173        if daemonize:
174            if print_pidfile:
175                print(pid_file)
176
177            daemonize_process(pid_file, logger)
178        run()
179
180
181def quit_with_error(error_string: str) -> NoReturn:
182    message_lines = error_string.split("\n")
183    line_length = min(max(len(line) for line in message_lines), 80) + 2
184    sys.stderr.write("*" * line_length + "\n")
185    for line in message_lines:
186        sys.stderr.write(" %s\n" % (line.rstrip(),))
187    sys.stderr.write("*" * line_length + "\n")
188    sys.exit(1)
189
190
191def handle_startup_exception(e: Exception) -> NoReturn:
192    # Exceptions that occur between setting up the logging and forking or starting
193    # the reactor are written to the logs, followed by a summary to stderr.
194    logger.exception("Exception during startup")
195    quit_with_error(
196        f"Error during initialisation:\n   {e}\nThere may be more information in the logs."
197    )
198
199
200def redirect_stdio_to_logs() -> None:
201    streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)]
202
203    for (stream, level) in streams:
204        oldStream = getattr(sys, stream)
205        loggingFile = LoggingFile(
206            logger=twisted.logger.Logger(namespace=stream),
207            level=level,
208            encoding=getattr(oldStream, "encoding", None),
209        )
210        setattr(sys, stream, loggingFile)
211
212    print("Redirected stdout/stderr to logs")
213
214
215def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None:
216    """Register a callback with the reactor, to be called once it is running
217
218    This can be used to initialise parts of the system which require an asynchronous
219    setup.
220
221    Any exception raised by the callback will be printed and logged, and the process
222    will exit.
223    """
224
225    async def wrapper() -> None:
226        try:
227            await cb(*args, **kwargs)
228        except Exception:
229            # previously, we used Failure().printTraceback() here, in the hope that
230            # would give better tracebacks than traceback.print_exc(). However, that
231            # doesn't handle chained exceptions (with a __cause__ or __context__) well,
232            # and I *think* the need for Failure() is reduced now that we mostly use
233            # async/await.
234
235            # Write the exception to both the logs *and* the unredirected stderr,
236            # because people tend to get confused if it only goes to one or the other.
237            #
238            # One problem with this is that if people are using a logging config that
239            # logs to the console (as is common eg under docker), they will get two
240            # copies of the exception. We could maybe try to detect that, but it's
241            # probably a cost we can bear.
242            logger.fatal("Error during startup", exc_info=True)
243            print("Error during startup:", file=sys.__stderr__)
244            traceback.print_exc(file=sys.__stderr__)
245
246            # it's no use calling sys.exit here, since that just raises a SystemExit
247            # exception which is then caught by the reactor, and everything carries
248            # on as normal.
249            os._exit(1)
250
251    reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
252
253
254def listen_metrics(bind_addresses: Iterable[str], port: int) -> None:
255    """
256    Start Prometheus metrics server.
257    """
258    from synapse.metrics import RegistryProxy, start_http_server
259
260    for host in bind_addresses:
261        logger.info("Starting metrics listener on %s:%d", host, port)
262        start_http_server(port, addr=host, registry=RegistryProxy)
263
264
265def listen_manhole(
266    bind_addresses: Collection[str],
267    port: int,
268    manhole_settings: ManholeConfig,
269    manhole_globals: dict,
270) -> None:
271    # twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
272    # warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
273    # suppress the warning for now.
274    warnings.filterwarnings(
275        action="ignore",
276        category=CryptographyDeprecationWarning,
277        message="int_from_bytes is deprecated",
278    )
279
280    from synapse.util.manhole import manhole
281
282    listen_tcp(
283        bind_addresses,
284        port,
285        manhole(settings=manhole_settings, globals=manhole_globals),
286    )
287
288
289def listen_tcp(
290    bind_addresses: Collection[str],
291    port: int,
292    factory: ServerFactory,
293    reactor: IReactorTCP = reactor,
294    backlog: int = 50,
295) -> List[Port]:
296    """
297    Create a TCP socket for a port and several addresses
298
299    Returns:
300        list of twisted.internet.tcp.Port listening for TCP connections
301    """
302    r = []
303    for address in bind_addresses:
304        try:
305            r.append(reactor.listenTCP(port, factory, backlog, address))
306        except error.CannotListenError as e:
307            check_bind_error(e, address, bind_addresses)
308
309    # IReactorTCP returns an object implementing IListeningPort from listenTCP,
310    # but we know it will be a Port instance.
311    return r  # type: ignore[return-value]
312
313
314def listen_ssl(
315    bind_addresses: Collection[str],
316    port: int,
317    factory: ServerFactory,
318    context_factory: IOpenSSLContextFactory,
319    reactor: IReactorSSL = reactor,
320    backlog: int = 50,
321) -> List[Port]:
322    """
323    Create an TLS-over-TCP socket for a port and several addresses
324
325    Returns:
326        list of twisted.internet.tcp.Port listening for TLS connections
327    """
328    r = []
329    for address in bind_addresses:
330        try:
331            r.append(
332                reactor.listenSSL(port, factory, context_factory, backlog, address)
333            )
334        except error.CannotListenError as e:
335            check_bind_error(e, address, bind_addresses)
336
337    # IReactorSSL incorrectly declares that an int is returned from listenSSL,
338    # it actually returns an object implementing IListeningPort, but we know it
339    # will be a Port instance.
340    return r  # type: ignore[return-value]
341
342
343def refresh_certificate(hs: "HomeServer") -> None:
344    """
345    Refresh the TLS certificates that Synapse is using by re-reading them from
346    disk and updating the TLS context factories to use them.
347    """
348    if not hs.config.server.has_tls_listener():
349        return
350
351    hs.config.tls.read_certificate_from_disk()
352    hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
353
354    if hs._listening_services:
355        logger.info("Updating context factories...")
356        for i in hs._listening_services:
357            # When you listenSSL, it doesn't make an SSL port but a TCP one with
358            # a TLS wrapping factory around the factory you actually want to get
359            # requests. This factory attribute is public but missing from
360            # Twisted's documentation.
361            if isinstance(i.factory, TLSMemoryBIOFactory):
362                addr = i.getHost()
363                logger.info(
364                    "Replacing TLS context factory on [%s]:%i", addr.host, addr.port
365                )
366                # We want to replace TLS factories with a new one, with the new
367                # TLS configuration. We do this by reaching in and pulling out
368                # the wrappedFactory, and then re-wrapping it.
369                i.factory = TLSMemoryBIOFactory(
370                    hs.tls_server_context_factory, False, i.factory.wrappedFactory
371                )
372        logger.info("Context factories updated.")
373
374
375async def start(hs: "HomeServer") -> None:
376    """
377    Start a Synapse server or worker.
378
379    Should be called once the reactor is running.
380
381    Will start the main HTTP listeners and do some other startup tasks, and then
382    notify systemd.
383
384    Args:
385        hs: homeserver instance
386    """
387    reactor = hs.get_reactor()
388
389    # We want to use a separate thread pool for the resolver so that large
390    # numbers of DNS requests don't starve out other users of the threadpool.
391    resolver_threadpool = ThreadPool(name="gai_resolver")
392    resolver_threadpool.start()
393    reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop)
394    reactor.installNameResolver(
395        GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
396    )
397
398    # Register the threadpools with our metrics.
399    register_threadpool("default", reactor.getThreadPool())
400    register_threadpool("gai_resolver", resolver_threadpool)
401
402    # Set up the SIGHUP machinery.
403    if hasattr(signal, "SIGHUP"):
404
405        @wrap_as_background_process("sighup")
406        async def handle_sighup(*args: Any, **kwargs: Any) -> None:
407            # Tell systemd our state, if we're using it. This will silently fail if
408            # we're not using systemd.
409            sdnotify(b"RELOADING=1")
410
411            for i, args, kwargs in _sighup_callbacks:
412                i(*args, **kwargs)
413
414            sdnotify(b"READY=1")
415
416        # We defer running the sighup handlers until next reactor tick. This
417        # is so that we're in a sane state, e.g. flushing the logs may fail
418        # if the sighup happens in the middle of writing a log entry.
419        def run_sighup(*args: Any, **kwargs: Any) -> None:
420            # `callFromThread` should be "signal safe" as well as thread
421            # safe.
422            reactor.callFromThread(handle_sighup, *args, **kwargs)
423
424        signal.signal(signal.SIGHUP, run_sighup)
425
426        register_sighup(refresh_certificate, hs)
427
428    # Load the certificate from disk.
429    refresh_certificate(hs)
430
431    # Start the tracer
432    synapse.logging.opentracing.init_tracer(hs)  # type: ignore[attr-defined] # noqa
433
434    # Instantiate the modules so they can register their web resources to the module API
435    # before we start the listeners.
436    module_api = hs.get_module_api()
437    for module, config in hs.config.modules.loaded_modules:
438        module(config=config, api=module_api)
439
440    load_legacy_spam_checkers(hs)
441    load_legacy_third_party_event_rules(hs)
442    load_legacy_presence_router(hs)
443    load_legacy_password_auth_providers(hs)
444
445    # If we've configured an expiry time for caches, start the background job now.
446    setup_expire_lru_cache_entries(hs)
447
448    # It is now safe to start your Synapse.
449    hs.start_listening()
450    hs.get_datastore().db_pool.start_profiling()
451    hs.get_pusherpool().start()
452
453    # Log when we start the shut down process.
454    hs.get_reactor().addSystemEventTrigger(
455        "before", "shutdown", logger.info, "Shutting down..."
456    )
457
458    setup_sentry(hs)
459    setup_sdnotify(hs)
460
461    # If background tasks are running on the main process, start collecting the
462    # phone home stats.
463    if hs.config.worker.run_background_tasks:
464        start_phone_stats_home(hs)
465
466    # We now freeze all allocated objects in the hopes that (almost)
467    # everything currently allocated are things that will be used for the
468    # rest of time. Doing so means less work each GC (hopefully).
469    #
470    # This only works on Python 3.7
471    if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
472        gc.collect()
473        gc.freeze()
474
475    # Speed up shutdowns by freezing all allocated objects. This moves everything
476    # into the permanent generation and excludes them from the final GC.
477    # Unfortunately only works on Python 3.7
478    if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
479        atexit.register(gc.freeze)
480
481
482def setup_sentry(hs: "HomeServer") -> None:
483    """Enable sentry integration, if enabled in configuration"""
484
485    if not hs.config.metrics.sentry_enabled:
486        return
487
488    import sentry_sdk
489
490    sentry_sdk.init(
491        dsn=hs.config.metrics.sentry_dsn, release=get_version_string(synapse)
492    )
493
494    # We set some default tags that give some context to this instance
495    with sentry_sdk.configure_scope() as scope:
496        scope.set_tag("matrix_server_name", hs.config.server.server_name)
497
498        app = (
499            hs.config.worker.worker_app
500            if hs.config.worker.worker_app
501            else "synapse.app.homeserver"
502        )
503        name = hs.get_instance_name()
504        scope.set_tag("worker_app", app)
505        scope.set_tag("worker_name", name)
506
507
508def setup_sdnotify(hs: "HomeServer") -> None:
509    """Adds process state hooks to tell systemd what we are up to."""
510
511    # Tell systemd our state, if we're using it. This will silently fail if
512    # we're not using systemd.
513    sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
514
515    hs.get_reactor().addSystemEventTrigger(
516        "before", "shutdown", sdnotify, b"STOPPING=1"
517    )
518
519
520sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
521
522
523def sdnotify(state: bytes) -> None:
524    """
525    Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
526
527    This function is based on the sdnotify python package, but since it's only a few
528    lines of code, it's easier to duplicate it here than to add a dependency on a
529    package which many OSes don't include as a matter of principle.
530
531    Args:
532        state: notification to send
533    """
534    if not isinstance(state, bytes):
535        raise TypeError("sdnotify should be called with a bytes")
536    if not sdnotify_sockaddr:
537        return
538    addr = sdnotify_sockaddr
539    if addr[0] == "@":
540        addr = "\0" + addr[1:]
541
542    try:
543        with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
544            sock.connect(addr)
545            sock.sendall(state)
546    except Exception as e:
547        # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
548        # unless systemd is expecting us to notify it.
549        logger.warning("Unable to send notification to systemd: %s", e)
550
551
552def max_request_body_size(config: HomeServerConfig) -> int:
553    """Get a suitable maximum size for incoming HTTP requests"""
554
555    # Other than media uploads, the biggest request we expect to see is a fully-loaded
556    # /federation/v1/send request.
557    #
558    # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
559    # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
560    # json encoding); there is no specced limit to EDUs (see
561    # https://github.com/matrix-org/matrix-doc/issues/3121).
562    #
563    # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
564    #
565    max_request_size = 200 * MAX_PDU_SIZE
566
567    # if we have a media repo enabled, we may need to allow larger uploads than that
568    if config.media.can_load_media_repo:
569        max_request_size = max(max_request_size, config.media.max_upload_size)
570
571    return max_request_size
572