1# Copyright 2016 OpenMarket Ltd 2# Copyright 2021 The Matrix.org Foundation C.I.C. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import argparse 17from typing import List, Union 18 19import attr 20 21from ._base import ( 22 Config, 23 ConfigError, 24 RoutableShardedWorkerHandlingConfig, 25 ShardedWorkerHandlingConfig, 26) 27from .server import ListenerConfig, parse_listener_def 28 29_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ 30The send_federation config option must be disabled in the main 31synapse process before they can be run in a separate worker. 32 33Please add ``send_federation: false`` to the main config 34""" 35 36_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ 37The start_pushers config option must be disabled in the main 38synapse process before they can be run in a separate worker. 39 40Please add ``start_pushers: false`` to the main config 41""" 42 43 44def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: 45 """Helper for allowing parsing a string or list of strings to a config 46 option expecting a list of strings. 47 """ 48 49 if isinstance(obj, str): 50 return [obj] 51 return obj 52 53 54@attr.s 55class InstanceLocationConfig: 56 """The host and port to talk to an instance via HTTP replication.""" 57 58 host = attr.ib(type=str) 59 port = attr.ib(type=int) 60 61 62@attr.s 63class WriterLocations: 64 """Specifies the instances that write various streams. 65 66 Attributes: 67 events: The instances that write to the event and backfill streams. 68 typing: The instances that write to the typing stream. Currently 69 can only be a single instance. 70 to_device: The instances that write to the to_device stream. Currently 71 can only be a single instance. 72 account_data: The instances that write to the account data streams. Currently 73 can only be a single instance. 74 receipts: The instances that write to the receipts stream. Currently 75 can only be a single instance. 76 presence: The instances that write to the presence stream. Currently 77 can only be a single instance. 78 """ 79 80 events = attr.ib( 81 default=["master"], 82 type=List[str], 83 converter=_instance_to_list_converter, 84 ) 85 typing = attr.ib( 86 default=["master"], 87 type=List[str], 88 converter=_instance_to_list_converter, 89 ) 90 to_device = attr.ib( 91 default=["master"], 92 type=List[str], 93 converter=_instance_to_list_converter, 94 ) 95 account_data = attr.ib( 96 default=["master"], 97 type=List[str], 98 converter=_instance_to_list_converter, 99 ) 100 receipts = attr.ib( 101 default=["master"], 102 type=List[str], 103 converter=_instance_to_list_converter, 104 ) 105 presence = attr.ib( 106 default=["master"], 107 type=List[str], 108 converter=_instance_to_list_converter, 109 ) 110 111 112class WorkerConfig(Config): 113 """The workers are processes run separately to the main synapse process. 114 They have their own pid_file and listener configuration. They use the 115 replication_url to talk to the main synapse process.""" 116 117 section = "worker" 118 119 def read_config(self, config, **kwargs): 120 self.worker_app = config.get("worker_app") 121 122 # Canonicalise worker_app so that master always has None 123 if self.worker_app == "synapse.app.homeserver": 124 self.worker_app = None 125 126 self.worker_listeners = [ 127 parse_listener_def(x) for x in config.get("worker_listeners", []) 128 ] 129 self.worker_daemonize = config.get("worker_daemonize") 130 self.worker_pid_file = config.get("worker_pid_file") 131 self.worker_log_config = config.get("worker_log_config") 132 133 # The host used to connect to the main synapse 134 self.worker_replication_host = config.get("worker_replication_host", None) 135 136 # The port on the main synapse for TCP replication 137 self.worker_replication_port = config.get("worker_replication_port", None) 138 139 # The port on the main synapse for HTTP replication endpoint 140 self.worker_replication_http_port = config.get("worker_replication_http_port") 141 142 # The shared secret used for authentication when connecting to the main synapse. 143 self.worker_replication_secret = config.get("worker_replication_secret", None) 144 145 self.worker_name = config.get("worker_name", self.worker_app) 146 self.instance_name = self.worker_name or "master" 147 148 self.worker_main_http_uri = config.get("worker_main_http_uri", None) 149 150 # This option is really only here to support `--manhole` command line 151 # argument. 152 manhole = config.get("worker_manhole") 153 if manhole: 154 self.worker_listeners.append( 155 ListenerConfig( 156 port=manhole, 157 bind_addresses=["127.0.0.1"], 158 type="manhole", 159 ) 160 ) 161 162 # Handle federation sender configuration. 163 # 164 # There are two ways of configuring which instances handle federation 165 # sending: 166 # 1. The old way where "send_federation" is set to false and running a 167 # `synapse.app.federation_sender` worker app. 168 # 2. Specifying the workers sending federation in 169 # `federation_sender_instances`. 170 # 171 172 send_federation = config.get("send_federation", True) 173 174 federation_sender_instances = config.get("federation_sender_instances") 175 if federation_sender_instances is None: 176 # Default to an empty list, which means "another, unknown, worker is 177 # responsible for it". 178 federation_sender_instances = [] 179 180 # If no federation sender instances are set we check if 181 # `send_federation` is set, which means use master 182 if send_federation: 183 federation_sender_instances = ["master"] 184 185 if self.worker_app == "synapse.app.federation_sender": 186 if send_federation: 187 # If we're running federation senders, and not using 188 # `federation_sender_instances`, then we should have 189 # explicitly set `send_federation` to false. 190 raise ConfigError( 191 _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR 192 ) 193 194 federation_sender_instances = [self.worker_name] 195 196 self.send_federation = self.instance_name in federation_sender_instances 197 self.federation_shard_config = ShardedWorkerHandlingConfig( 198 federation_sender_instances 199 ) 200 201 # A map from instance name to host/port of their HTTP replication endpoint. 202 instance_map = config.get("instance_map") or {} 203 self.instance_map = { 204 name: InstanceLocationConfig(**c) for name, c in instance_map.items() 205 } 206 207 # Map from type of streams to source, c.f. WriterLocations. 208 writers = config.get("stream_writers") or {} 209 self.writers = WriterLocations(**writers) 210 211 # Check that the configured writers for events and typing also appears in 212 # `instance_map`. 213 for stream in ( 214 "events", 215 "typing", 216 "to_device", 217 "account_data", 218 "receipts", 219 "presence", 220 ): 221 instances = _instance_to_list_converter(getattr(self.writers, stream)) 222 for instance in instances: 223 if instance != "master" and instance not in self.instance_map: 224 raise ConfigError( 225 "Instance %r is configured to write %s but does not appear in `instance_map` config." 226 % (instance, stream) 227 ) 228 229 if len(self.writers.typing) != 1: 230 raise ConfigError( 231 "Must only specify one instance to handle `typing` messages." 232 ) 233 234 if len(self.writers.to_device) != 1: 235 raise ConfigError( 236 "Must only specify one instance to handle `to_device` messages." 237 ) 238 239 if len(self.writers.account_data) != 1: 240 raise ConfigError( 241 "Must only specify one instance to handle `account_data` messages." 242 ) 243 244 if len(self.writers.receipts) != 1: 245 raise ConfigError( 246 "Must only specify one instance to handle `receipts` messages." 247 ) 248 249 if len(self.writers.events) == 0: 250 raise ConfigError("Must specify at least one instance to handle `events`.") 251 252 if len(self.writers.presence) != 1: 253 raise ConfigError( 254 "Must only specify one instance to handle `presence` messages." 255 ) 256 257 self.events_shard_config = RoutableShardedWorkerHandlingConfig( 258 self.writers.events 259 ) 260 261 # Handle sharded push 262 start_pushers = config.get("start_pushers", True) 263 pusher_instances = config.get("pusher_instances") 264 if pusher_instances is None: 265 # Default to an empty list, which means "another, unknown, worker is 266 # responsible for it". 267 pusher_instances = [] 268 269 # If no pushers instances are set we check if `start_pushers` is 270 # set, which means use master 271 if start_pushers: 272 pusher_instances = ["master"] 273 274 if self.worker_app == "synapse.app.pusher": 275 if start_pushers: 276 # If we're running pushers, and not using 277 # `pusher_instances`, then we should have explicitly set 278 # `start_pushers` to false. 279 raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) 280 281 pusher_instances = [self.instance_name] 282 283 self.start_pushers = self.instance_name in pusher_instances 284 self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) 285 286 # Whether this worker should run background tasks or not. 287 # 288 # As a note for developers, the background tasks guarded by this should 289 # be able to run on only a single instance (meaning that they don't 290 # depend on any in-memory state of a particular worker). 291 # 292 # No effort is made to ensure only a single instance of these tasks is 293 # running. 294 background_tasks_instance = config.get("run_background_tasks_on") or "master" 295 self.run_background_tasks = ( 296 self.worker_name is None and background_tasks_instance == "master" 297 ) or self.worker_name == background_tasks_instance 298 299 def generate_config_section(self, config_dir_path, server_name, **kwargs): 300 return """\ 301 ## Workers ## 302 303 # Disables sending of outbound federation transactions on the main process. 304 # Uncomment if using a federation sender worker. 305 # 306 #send_federation: false 307 308 # It is possible to run multiple federation sender workers, in which case the 309 # work is balanced across them. 310 # 311 # This configuration must be shared between all federation sender workers, and if 312 # changed all federation sender workers must be stopped at the same time and then 313 # started, to ensure that all instances are running with the same config (otherwise 314 # events may be dropped). 315 # 316 #federation_sender_instances: 317 # - federation_sender1 318 319 # When using workers this should be a map from `worker_name` to the 320 # HTTP replication listener of the worker, if configured. 321 # 322 #instance_map: 323 # worker1: 324 # host: localhost 325 # port: 8034 326 327 # Experimental: When using workers you can define which workers should 328 # handle event persistence and typing notifications. Any worker 329 # specified here must also be in the `instance_map`. 330 # 331 #stream_writers: 332 # events: worker1 333 # typing: worker1 334 335 # The worker that is used to run background tasks (e.g. cleaning up expired 336 # data). If not provided this defaults to the main process. 337 # 338 #run_background_tasks_on: worker1 339 340 # A shared secret used by the replication APIs to authenticate HTTP requests 341 # from workers. 342 # 343 # By default this is unused and traffic is not authenticated. 344 # 345 #worker_replication_secret: "" 346 """ 347 348 def read_arguments(self, args: argparse.Namespace) -> None: 349 # We support a bunch of command line arguments that override options in 350 # the config. A lot of these options have a worker_* prefix when running 351 # on workers so we also have to override them when command line options 352 # are specified. 353 354 if args.daemonize is not None: 355 self.worker_daemonize = args.daemonize 356 if args.manhole is not None: 357 self.worker_manhole = args.worker_manhole 358