1# Copyright 2017 New Vector Ltd 2# Copyright 2019-2021 The Matrix.org Foundation C.I.C 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import atexit 16import gc 17import logging 18import os 19import platform 20import signal 21import socket 22import sys 23import traceback 24import warnings 25from typing import ( 26 TYPE_CHECKING, 27 Any, 28 Awaitable, 29 Callable, 30 Collection, 31 Dict, 32 Iterable, 33 List, 34 NoReturn, 35 Optional, 36 Tuple, 37 cast, 38) 39 40from cryptography.utils import CryptographyDeprecationWarning 41 42import twisted 43from twisted.internet import defer, error, reactor as _reactor 44from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP 45from twisted.internet.protocol import ServerFactory 46from twisted.internet.tcp import Port 47from twisted.logger import LoggingFile, LogLevel 48from twisted.protocols.tls import TLSMemoryBIOFactory 49from twisted.python.threadpool import ThreadPool 50 51import synapse 52from synapse.api.constants import MAX_PDU_SIZE 53from synapse.app import check_bind_error 54from synapse.app.phone_stats_home import start_phone_stats_home 55from synapse.config.homeserver import HomeServerConfig 56from synapse.config.server import ManholeConfig 57from synapse.crypto import context_factory 58from synapse.events.presence_router import load_legacy_presence_router 59from synapse.events.spamcheck import load_legacy_spam_checkers 60from synapse.events.third_party_rules import load_legacy_third_party_event_rules 61from synapse.handlers.auth import load_legacy_password_auth_providers 62from synapse.logging.context import PreserveLoggingContext 63from synapse.metrics import register_threadpool 64from synapse.metrics.background_process_metrics import wrap_as_background_process 65from synapse.metrics.jemalloc import setup_jemalloc_stats 66from synapse.types import ISynapseReactor 67from synapse.util.caches.lrucache import setup_expire_lru_cache_entries 68from synapse.util.daemonize import daemonize_process 69from synapse.util.gai_resolver import GAIResolver 70from synapse.util.rlimit import change_resource_limit 71from synapse.util.versionstring import get_version_string 72 73if TYPE_CHECKING: 74 from synapse.server import HomeServer 75 76# Twisted injects the global reactor to make it easier to import, this confuses 77# mypy which thinks it is a module. Tell it that it a more proper type. 78reactor = cast(ISynapseReactor, _reactor) 79 80 81logger = logging.getLogger(__name__) 82 83# list of tuples of function, args list, kwargs dict 84_sighup_callbacks: List[ 85 Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]] 86] = [] 87 88 89def register_sighup(func: Callable[..., None], *args: Any, **kwargs: Any) -> None: 90 """ 91 Register a function to be called when a SIGHUP occurs. 92 93 Args: 94 func: Function to be called when sent a SIGHUP signal. 95 *args, **kwargs: args and kwargs to be passed to the target function. 96 """ 97 _sighup_callbacks.append((func, args, kwargs)) 98 99 100def start_worker_reactor( 101 appname: str, 102 config: HomeServerConfig, 103 run_command: Callable[[], None] = reactor.run, 104) -> None: 105 """Run the reactor in the main process 106 107 Daemonizes if necessary, and then configures some resources, before starting 108 the reactor. Pulls configuration from the 'worker' settings in 'config'. 109 110 Args: 111 appname: application name which will be sent to syslog 112 config: config object 113 run_command: callable that actually runs the reactor 114 """ 115 116 logger = logging.getLogger(config.worker.worker_app) 117 118 start_reactor( 119 appname, 120 soft_file_limit=config.server.soft_file_limit, 121 gc_thresholds=config.server.gc_thresholds, 122 pid_file=config.worker.worker_pid_file, 123 daemonize=config.worker.worker_daemonize, 124 print_pidfile=config.server.print_pidfile, 125 logger=logger, 126 run_command=run_command, 127 ) 128 129 130def start_reactor( 131 appname: str, 132 soft_file_limit: int, 133 gc_thresholds: Optional[Tuple[int, int, int]], 134 pid_file: str, 135 daemonize: bool, 136 print_pidfile: bool, 137 logger: logging.Logger, 138 run_command: Callable[[], None] = reactor.run, 139) -> None: 140 """Run the reactor in the main process 141 142 Daemonizes if necessary, and then configures some resources, before starting 143 the reactor 144 145 Args: 146 appname: application name which will be sent to syslog 147 soft_file_limit: 148 gc_thresholds: 149 pid_file: name of pid file to write to if daemonize is True 150 daemonize: true to run the reactor in a background process 151 print_pidfile: whether to print the pid file, if daemonize is True 152 logger: logger instance to pass to Daemonize 153 run_command: callable that actually runs the reactor 154 """ 155 156 def run() -> None: 157 logger.info("Running") 158 setup_jemalloc_stats() 159 change_resource_limit(soft_file_limit) 160 if gc_thresholds: 161 gc.set_threshold(*gc_thresholds) 162 run_command() 163 164 # make sure that we run the reactor with the sentinel log context, 165 # otherwise other PreserveLoggingContext instances will get confused 166 # and complain when they see the logcontext arbitrarily swapping 167 # between the sentinel and `run` logcontexts. 168 # 169 # We also need to drop the logcontext before forking if we're daemonizing, 170 # otherwise the cputime metrics get confused about the per-thread resource usage 171 # appearing to go backwards. 172 with PreserveLoggingContext(): 173 if daemonize: 174 if print_pidfile: 175 print(pid_file) 176 177 daemonize_process(pid_file, logger) 178 run() 179 180 181def quit_with_error(error_string: str) -> NoReturn: 182 message_lines = error_string.split("\n") 183 line_length = min(max(len(line) for line in message_lines), 80) + 2 184 sys.stderr.write("*" * line_length + "\n") 185 for line in message_lines: 186 sys.stderr.write(" %s\n" % (line.rstrip(),)) 187 sys.stderr.write("*" * line_length + "\n") 188 sys.exit(1) 189 190 191def handle_startup_exception(e: Exception) -> NoReturn: 192 # Exceptions that occur between setting up the logging and forking or starting 193 # the reactor are written to the logs, followed by a summary to stderr. 194 logger.exception("Exception during startup") 195 quit_with_error( 196 f"Error during initialisation:\n {e}\nThere may be more information in the logs." 197 ) 198 199 200def redirect_stdio_to_logs() -> None: 201 streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)] 202 203 for (stream, level) in streams: 204 oldStream = getattr(sys, stream) 205 loggingFile = LoggingFile( 206 logger=twisted.logger.Logger(namespace=stream), 207 level=level, 208 encoding=getattr(oldStream, "encoding", None), 209 ) 210 setattr(sys, stream, loggingFile) 211 212 print("Redirected stdout/stderr to logs") 213 214 215def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None: 216 """Register a callback with the reactor, to be called once it is running 217 218 This can be used to initialise parts of the system which require an asynchronous 219 setup. 220 221 Any exception raised by the callback will be printed and logged, and the process 222 will exit. 223 """ 224 225 async def wrapper() -> None: 226 try: 227 await cb(*args, **kwargs) 228 except Exception: 229 # previously, we used Failure().printTraceback() here, in the hope that 230 # would give better tracebacks than traceback.print_exc(). However, that 231 # doesn't handle chained exceptions (with a __cause__ or __context__) well, 232 # and I *think* the need for Failure() is reduced now that we mostly use 233 # async/await. 234 235 # Write the exception to both the logs *and* the unredirected stderr, 236 # because people tend to get confused if it only goes to one or the other. 237 # 238 # One problem with this is that if people are using a logging config that 239 # logs to the console (as is common eg under docker), they will get two 240 # copies of the exception. We could maybe try to detect that, but it's 241 # probably a cost we can bear. 242 logger.fatal("Error during startup", exc_info=True) 243 print("Error during startup:", file=sys.__stderr__) 244 traceback.print_exc(file=sys.__stderr__) 245 246 # it's no use calling sys.exit here, since that just raises a SystemExit 247 # exception which is then caught by the reactor, and everything carries 248 # on as normal. 249 os._exit(1) 250 251 reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper())) 252 253 254def listen_metrics(bind_addresses: Iterable[str], port: int) -> None: 255 """ 256 Start Prometheus metrics server. 257 """ 258 from synapse.metrics import RegistryProxy, start_http_server 259 260 for host in bind_addresses: 261 logger.info("Starting metrics listener on %s:%d", host, port) 262 start_http_server(port, addr=host, registry=RegistryProxy) 263 264 265def listen_manhole( 266 bind_addresses: Collection[str], 267 port: int, 268 manhole_settings: ManholeConfig, 269 manhole_globals: dict, 270) -> None: 271 # twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing 272 # warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so 273 # suppress the warning for now. 274 warnings.filterwarnings( 275 action="ignore", 276 category=CryptographyDeprecationWarning, 277 message="int_from_bytes is deprecated", 278 ) 279 280 from synapse.util.manhole import manhole 281 282 listen_tcp( 283 bind_addresses, 284 port, 285 manhole(settings=manhole_settings, globals=manhole_globals), 286 ) 287 288 289def listen_tcp( 290 bind_addresses: Collection[str], 291 port: int, 292 factory: ServerFactory, 293 reactor: IReactorTCP = reactor, 294 backlog: int = 50, 295) -> List[Port]: 296 """ 297 Create a TCP socket for a port and several addresses 298 299 Returns: 300 list of twisted.internet.tcp.Port listening for TCP connections 301 """ 302 r = [] 303 for address in bind_addresses: 304 try: 305 r.append(reactor.listenTCP(port, factory, backlog, address)) 306 except error.CannotListenError as e: 307 check_bind_error(e, address, bind_addresses) 308 309 # IReactorTCP returns an object implementing IListeningPort from listenTCP, 310 # but we know it will be a Port instance. 311 return r # type: ignore[return-value] 312 313 314def listen_ssl( 315 bind_addresses: Collection[str], 316 port: int, 317 factory: ServerFactory, 318 context_factory: IOpenSSLContextFactory, 319 reactor: IReactorSSL = reactor, 320 backlog: int = 50, 321) -> List[Port]: 322 """ 323 Create an TLS-over-TCP socket for a port and several addresses 324 325 Returns: 326 list of twisted.internet.tcp.Port listening for TLS connections 327 """ 328 r = [] 329 for address in bind_addresses: 330 try: 331 r.append( 332 reactor.listenSSL(port, factory, context_factory, backlog, address) 333 ) 334 except error.CannotListenError as e: 335 check_bind_error(e, address, bind_addresses) 336 337 # IReactorSSL incorrectly declares that an int is returned from listenSSL, 338 # it actually returns an object implementing IListeningPort, but we know it 339 # will be a Port instance. 340 return r # type: ignore[return-value] 341 342 343def refresh_certificate(hs: "HomeServer") -> None: 344 """ 345 Refresh the TLS certificates that Synapse is using by re-reading them from 346 disk and updating the TLS context factories to use them. 347 """ 348 if not hs.config.server.has_tls_listener(): 349 return 350 351 hs.config.tls.read_certificate_from_disk() 352 hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config) 353 354 if hs._listening_services: 355 logger.info("Updating context factories...") 356 for i in hs._listening_services: 357 # When you listenSSL, it doesn't make an SSL port but a TCP one with 358 # a TLS wrapping factory around the factory you actually want to get 359 # requests. This factory attribute is public but missing from 360 # Twisted's documentation. 361 if isinstance(i.factory, TLSMemoryBIOFactory): 362 addr = i.getHost() 363 logger.info( 364 "Replacing TLS context factory on [%s]:%i", addr.host, addr.port 365 ) 366 # We want to replace TLS factories with a new one, with the new 367 # TLS configuration. We do this by reaching in and pulling out 368 # the wrappedFactory, and then re-wrapping it. 369 i.factory = TLSMemoryBIOFactory( 370 hs.tls_server_context_factory, False, i.factory.wrappedFactory 371 ) 372 logger.info("Context factories updated.") 373 374 375async def start(hs: "HomeServer") -> None: 376 """ 377 Start a Synapse server or worker. 378 379 Should be called once the reactor is running. 380 381 Will start the main HTTP listeners and do some other startup tasks, and then 382 notify systemd. 383 384 Args: 385 hs: homeserver instance 386 """ 387 reactor = hs.get_reactor() 388 389 # We want to use a separate thread pool for the resolver so that large 390 # numbers of DNS requests don't starve out other users of the threadpool. 391 resolver_threadpool = ThreadPool(name="gai_resolver") 392 resolver_threadpool.start() 393 reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop) 394 reactor.installNameResolver( 395 GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool) 396 ) 397 398 # Register the threadpools with our metrics. 399 register_threadpool("default", reactor.getThreadPool()) 400 register_threadpool("gai_resolver", resolver_threadpool) 401 402 # Set up the SIGHUP machinery. 403 if hasattr(signal, "SIGHUP"): 404 405 @wrap_as_background_process("sighup") 406 async def handle_sighup(*args: Any, **kwargs: Any) -> None: 407 # Tell systemd our state, if we're using it. This will silently fail if 408 # we're not using systemd. 409 sdnotify(b"RELOADING=1") 410 411 for i, args, kwargs in _sighup_callbacks: 412 i(*args, **kwargs) 413 414 sdnotify(b"READY=1") 415 416 # We defer running the sighup handlers until next reactor tick. This 417 # is so that we're in a sane state, e.g. flushing the logs may fail 418 # if the sighup happens in the middle of writing a log entry. 419 def run_sighup(*args: Any, **kwargs: Any) -> None: 420 # `callFromThread` should be "signal safe" as well as thread 421 # safe. 422 reactor.callFromThread(handle_sighup, *args, **kwargs) 423 424 signal.signal(signal.SIGHUP, run_sighup) 425 426 register_sighup(refresh_certificate, hs) 427 428 # Load the certificate from disk. 429 refresh_certificate(hs) 430 431 # Start the tracer 432 synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa 433 434 # Instantiate the modules so they can register their web resources to the module API 435 # before we start the listeners. 436 module_api = hs.get_module_api() 437 for module, config in hs.config.modules.loaded_modules: 438 module(config=config, api=module_api) 439 440 load_legacy_spam_checkers(hs) 441 load_legacy_third_party_event_rules(hs) 442 load_legacy_presence_router(hs) 443 load_legacy_password_auth_providers(hs) 444 445 # If we've configured an expiry time for caches, start the background job now. 446 setup_expire_lru_cache_entries(hs) 447 448 # It is now safe to start your Synapse. 449 hs.start_listening() 450 hs.get_datastore().db_pool.start_profiling() 451 hs.get_pusherpool().start() 452 453 # Log when we start the shut down process. 454 hs.get_reactor().addSystemEventTrigger( 455 "before", "shutdown", logger.info, "Shutting down..." 456 ) 457 458 setup_sentry(hs) 459 setup_sdnotify(hs) 460 461 # If background tasks are running on the main process, start collecting the 462 # phone home stats. 463 if hs.config.worker.run_background_tasks: 464 start_phone_stats_home(hs) 465 466 # We now freeze all allocated objects in the hopes that (almost) 467 # everything currently allocated are things that will be used for the 468 # rest of time. Doing so means less work each GC (hopefully). 469 # 470 # This only works on Python 3.7 471 if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7): 472 gc.collect() 473 gc.freeze() 474 475 # Speed up shutdowns by freezing all allocated objects. This moves everything 476 # into the permanent generation and excludes them from the final GC. 477 # Unfortunately only works on Python 3.7 478 if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7): 479 atexit.register(gc.freeze) 480 481 482def setup_sentry(hs: "HomeServer") -> None: 483 """Enable sentry integration, if enabled in configuration""" 484 485 if not hs.config.metrics.sentry_enabled: 486 return 487 488 import sentry_sdk 489 490 sentry_sdk.init( 491 dsn=hs.config.metrics.sentry_dsn, release=get_version_string(synapse) 492 ) 493 494 # We set some default tags that give some context to this instance 495 with sentry_sdk.configure_scope() as scope: 496 scope.set_tag("matrix_server_name", hs.config.server.server_name) 497 498 app = ( 499 hs.config.worker.worker_app 500 if hs.config.worker.worker_app 501 else "synapse.app.homeserver" 502 ) 503 name = hs.get_instance_name() 504 scope.set_tag("worker_app", app) 505 scope.set_tag("worker_name", name) 506 507 508def setup_sdnotify(hs: "HomeServer") -> None: 509 """Adds process state hooks to tell systemd what we are up to.""" 510 511 # Tell systemd our state, if we're using it. This will silently fail if 512 # we're not using systemd. 513 sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),)) 514 515 hs.get_reactor().addSystemEventTrigger( 516 "before", "shutdown", sdnotify, b"STOPPING=1" 517 ) 518 519 520sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET") 521 522 523def sdnotify(state: bytes) -> None: 524 """ 525 Send a notification to systemd, if the NOTIFY_SOCKET env var is set. 526 527 This function is based on the sdnotify python package, but since it's only a few 528 lines of code, it's easier to duplicate it here than to add a dependency on a 529 package which many OSes don't include as a matter of principle. 530 531 Args: 532 state: notification to send 533 """ 534 if not isinstance(state, bytes): 535 raise TypeError("sdnotify should be called with a bytes") 536 if not sdnotify_sockaddr: 537 return 538 addr = sdnotify_sockaddr 539 if addr[0] == "@": 540 addr = "\0" + addr[1:] 541 542 try: 543 with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock: 544 sock.connect(addr) 545 sock.sendall(state) 546 except Exception as e: 547 # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET 548 # unless systemd is expecting us to notify it. 549 logger.warning("Unable to send notification to systemd: %s", e) 550 551 552def max_request_body_size(config: HomeServerConfig) -> int: 553 """Get a suitable maximum size for incoming HTTP requests""" 554 555 # Other than media uploads, the biggest request we expect to see is a fully-loaded 556 # /federation/v1/send request. 557 # 558 # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are 559 # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical 560 # json encoding); there is no specced limit to EDUs (see 561 # https://github.com/matrix-org/matrix-doc/issues/3121). 562 # 563 # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M) 564 # 565 max_request_size = 200 * MAX_PDU_SIZE 566 567 # if we have a media repo enabled, we may need to allow larger uploads than that 568 if config.media.can_load_media_repo: 569 max_request_size = max(max_request_size, config.media.max_upload_size) 570 571 return max_request_size 572