1# Copyright 2019 New Vector Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import abc 16import logging 17from collections import OrderedDict 18from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple 19 20import attr 21from prometheus_client import Counter 22from typing_extensions import Literal 23 24from twisted.internet import defer 25from twisted.internet.interfaces import IDelayedCall 26 27import synapse.metrics 28from synapse.api.presence import UserPresenceState 29from synapse.events import EventBase 30from synapse.federation.sender.per_destination_queue import PerDestinationQueue 31from synapse.federation.sender.transaction_manager import TransactionManager 32from synapse.federation.units import Edu 33from synapse.logging.context import make_deferred_yieldable, run_in_background 34from synapse.metrics import ( 35 LaterGauge, 36 event_processing_loop_counter, 37 event_processing_loop_room_count, 38 events_processed_counter, 39) 40from synapse.metrics.background_process_metrics import ( 41 run_as_background_process, 42 wrap_as_background_process, 43) 44from synapse.types import JsonDict, ReadReceipt, RoomStreamToken 45from synapse.util import Clock 46from synapse.util.metrics import Measure 47 48if TYPE_CHECKING: 49 from synapse.events.presence_router import PresenceRouter 50 from synapse.server import HomeServer 51 52logger = logging.getLogger(__name__) 53 54sent_pdus_destination_dist_count = Counter( 55 "synapse_federation_client_sent_pdu_destinations:count", 56 "Number of PDUs queued for sending to one or more destinations", 57) 58 59sent_pdus_destination_dist_total = Counter( 60 "synapse_federation_client_sent_pdu_destinations:total", 61 "Total number of PDUs queued for sending across all destinations", 62) 63 64# Time (in s) after Synapse's startup that we will begin to wake up destinations 65# that have catch-up outstanding. 66CATCH_UP_STARTUP_DELAY_SEC = 15 67 68# Time (in s) to wait in between waking up each destination, i.e. one destination 69# will be woken up every <x> seconds after Synapse's startup until we have woken 70# every destination has outstanding catch-up. 71CATCH_UP_STARTUP_INTERVAL_SEC = 5 72 73 74class AbstractFederationSender(metaclass=abc.ABCMeta): 75 @abc.abstractmethod 76 def notify_new_events(self, max_token: RoomStreamToken) -> None: 77 """This gets called when we have some new events we might want to 78 send out to other servers. 79 """ 80 raise NotImplementedError() 81 82 @abc.abstractmethod 83 async def send_read_receipt(self, receipt: ReadReceipt) -> None: 84 """Send a RR to any other servers in the room 85 86 Args: 87 receipt: receipt to be sent 88 """ 89 raise NotImplementedError() 90 91 @abc.abstractmethod 92 def send_presence_to_destinations( 93 self, states: Iterable[UserPresenceState], destinations: Iterable[str] 94 ) -> None: 95 """Send the given presence states to the given destinations. 96 97 Args: 98 destinations: 99 """ 100 raise NotImplementedError() 101 102 @abc.abstractmethod 103 def build_and_send_edu( 104 self, 105 destination: str, 106 edu_type: str, 107 content: JsonDict, 108 key: Optional[Hashable] = None, 109 ) -> None: 110 """Construct an Edu object, and queue it for sending 111 112 Args: 113 destination: name of server to send to 114 edu_type: type of EDU to send 115 content: content of EDU 116 key: clobbering key for this edu 117 """ 118 raise NotImplementedError() 119 120 @abc.abstractmethod 121 def send_device_messages(self, destination: str) -> None: 122 raise NotImplementedError() 123 124 @abc.abstractmethod 125 def wake_destination(self, destination: str) -> None: 126 """Called when we want to retry sending transactions to a remote. 127 128 This is mainly useful if the remote server has been down and we think it 129 might have come back. 130 """ 131 raise NotImplementedError() 132 133 @abc.abstractmethod 134 def get_current_token(self) -> int: 135 raise NotImplementedError() 136 137 @abc.abstractmethod 138 def federation_ack(self, instance_name: str, token: int) -> None: 139 raise NotImplementedError() 140 141 @abc.abstractmethod 142 async def get_replication_rows( 143 self, instance_name: str, from_token: int, to_token: int, target_row_count: int 144 ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: 145 raise NotImplementedError() 146 147 148@attr.s 149class _PresenceQueue: 150 """A queue of destinations that need to be woken up due to new presence 151 updates. 152 153 Staggers waking up of per destination queues to ensure that we don't attempt 154 to start TLS connections with many hosts all at once, leading to pinned CPU. 155 """ 156 157 # The maximum duration in seconds between queuing up a destination and it 158 # being woken up. 159 _MAX_TIME_IN_QUEUE = 30.0 160 161 # The maximum duration in seconds between waking up consecutive destination 162 # queues. 163 _MAX_DELAY = 0.1 164 165 sender: "FederationSender" = attr.ib() 166 clock: Clock = attr.ib() 167 queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) 168 processing: bool = attr.ib(default=False) 169 170 def add_to_queue(self, destination: str) -> None: 171 """Add a destination to the queue to be woken up.""" 172 173 self.queue[destination] = None 174 175 if not self.processing: 176 self._handle() 177 178 @wrap_as_background_process("_PresenceQueue.handle") 179 async def _handle(self) -> None: 180 """Background process to drain the queue.""" 181 182 if not self.queue: 183 return 184 185 assert not self.processing 186 self.processing = True 187 188 try: 189 # We start with a delay that should drain the queue quickly enough that 190 # we process all destinations in the queue in _MAX_TIME_IN_QUEUE 191 # seconds. 192 # 193 # We also add an upper bound to the delay, to gracefully handle the 194 # case where the queue only has a few entries in it. 195 current_sleep_seconds = min( 196 self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) 197 ) 198 199 while self.queue: 200 destination, _ = self.queue.popitem(last=False) 201 202 queue = self.sender._get_per_destination_queue(destination) 203 204 if not queue._new_data_to_send: 205 # The per destination queue has already been woken up. 206 continue 207 208 queue.attempt_new_transaction() 209 210 await self.clock.sleep(current_sleep_seconds) 211 212 if not self.queue: 213 break 214 215 # More destinations may have been added to the queue, so we may 216 # need to reduce the delay to ensure everything gets processed 217 # within _MAX_TIME_IN_QUEUE seconds. 218 current_sleep_seconds = min( 219 current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue) 220 ) 221 222 finally: 223 self.processing = False 224 225 226class FederationSender(AbstractFederationSender): 227 def __init__(self, hs: "HomeServer"): 228 self.hs = hs 229 self.server_name = hs.hostname 230 231 self.store = hs.get_datastore() 232 self.state = hs.get_state_handler() 233 234 self.clock = hs.get_clock() 235 self.is_mine_id = hs.is_mine_id 236 237 self._presence_router: Optional["PresenceRouter"] = None 238 self._transaction_manager = TransactionManager(hs) 239 240 self._instance_name = hs.get_instance_name() 241 self._federation_shard_config = hs.config.worker.federation_shard_config 242 243 # map from destination to PerDestinationQueue 244 self._per_destination_queues: Dict[str, PerDestinationQueue] = {} 245 246 LaterGauge( 247 "synapse_federation_transaction_queue_pending_destinations", 248 "", 249 [], 250 lambda: sum( 251 1 252 for d in self._per_destination_queues.values() 253 if d.transmission_loop_running 254 ), 255 ) 256 257 LaterGauge( 258 "synapse_federation_transaction_queue_pending_pdus", 259 "", 260 [], 261 lambda: sum( 262 d.pending_pdu_count() for d in self._per_destination_queues.values() 263 ), 264 ) 265 LaterGauge( 266 "synapse_federation_transaction_queue_pending_edus", 267 "", 268 [], 269 lambda: sum( 270 d.pending_edu_count() for d in self._per_destination_queues.values() 271 ), 272 ) 273 274 self._is_processing = False 275 self._last_poked_id = -1 276 277 # map from room_id to a set of PerDestinationQueues which we believe are 278 # awaiting a call to flush_read_receipts_for_room. The presence of an entry 279 # here for a given room means that we are rate-limiting RR flushes to that room, 280 # and that there is a pending call to _flush_rrs_for_room in the system. 281 self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} 282 283 self._rr_txn_interval_per_room_ms = ( 284 1000.0 285 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second 286 ) 287 288 # wake up destinations that have outstanding PDUs to be caught up 289 self._catchup_after_startup_timer: Optional[ 290 IDelayedCall 291 ] = self.clock.call_later( 292 CATCH_UP_STARTUP_DELAY_SEC, 293 run_as_background_process, 294 "wake_destinations_needing_catchup", 295 self._wake_destinations_needing_catchup, 296 ) 297 298 self._external_cache = hs.get_external_cache() 299 300 self._presence_queue = _PresenceQueue(self, self.clock) 301 302 def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: 303 """Get or create a PerDestinationQueue for the given destination 304 305 Args: 306 destination: server_name of remote server 307 """ 308 queue = self._per_destination_queues.get(destination) 309 if not queue: 310 queue = PerDestinationQueue(self.hs, self._transaction_manager, destination) 311 self._per_destination_queues[destination] = queue 312 return queue 313 314 def notify_new_events(self, max_token: RoomStreamToken) -> None: 315 """This gets called when we have some new events we might want to 316 send out to other servers. 317 """ 318 # We just use the minimum stream ordering and ignore the vector clock 319 # component. This is safe to do as long as we *always* ignore the vector 320 # clock components. 321 current_id = max_token.stream 322 323 self._last_poked_id = max(current_id, self._last_poked_id) 324 325 if self._is_processing: 326 return 327 328 # fire off a processing loop in the background 329 run_as_background_process( 330 "process_event_queue_for_federation", self._process_event_queue_loop 331 ) 332 333 async def _process_event_queue_loop(self) -> None: 334 try: 335 self._is_processing = True 336 while True: 337 last_token = await self.store.get_federation_out_pos("events") 338 next_token, events = await self.store.get_all_new_events_stream( 339 last_token, self._last_poked_id, limit=100 340 ) 341 342 logger.debug("Handling %s -> %s", last_token, next_token) 343 344 if not events and next_token >= self._last_poked_id: 345 break 346 347 async def handle_event(event: EventBase) -> None: 348 # Only send events for this server. 349 send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() 350 is_mine = self.is_mine_id(event.sender) 351 if not is_mine and send_on_behalf_of is None: 352 return 353 354 if not event.internal_metadata.should_proactively_send(): 355 return 356 357 destinations: Optional[Set[str]] = None 358 if not event.prev_event_ids(): 359 # If there are no prev event IDs then the state is empty 360 # and so no remote servers in the room 361 destinations = set() 362 else: 363 # We check the external cache for the destinations, which is 364 # stored per state group. 365 366 sg = await self._external_cache.get( 367 "event_to_prev_state_group", event.event_id 368 ) 369 if sg: 370 destinations = await self._external_cache.get( 371 "get_joined_hosts", str(sg) 372 ) 373 374 if destinations is None: 375 try: 376 # Get the state from before the event. 377 # We need to make sure that this is the state from before 378 # the event and not from after it. 379 # Otherwise if the last member on a server in a room is 380 # banned then it won't receive the event because it won't 381 # be in the room after the ban. 382 destinations = await self.state.get_hosts_in_room_at_events( 383 event.room_id, event_ids=event.prev_event_ids() 384 ) 385 except Exception: 386 logger.exception( 387 "Failed to calculate hosts in room for event: %s", 388 event.event_id, 389 ) 390 return 391 392 destinations = { 393 d 394 for d in destinations 395 if self._federation_shard_config.should_handle( 396 self._instance_name, d 397 ) 398 } 399 400 if send_on_behalf_of is not None: 401 # If we are sending the event on behalf of another server 402 # then it already has the event and there is no reason to 403 # send the event to it. 404 destinations.discard(send_on_behalf_of) 405 406 logger.debug("Sending %s to %r", event, destinations) 407 408 if destinations: 409 await self._send_pdu(event, destinations) 410 411 now = self.clock.time_msec() 412 ts = await self.store.get_received_ts(event.event_id) 413 assert ts is not None 414 synapse.metrics.event_processing_lag_by_event.labels( 415 "federation_sender" 416 ).observe((now - ts) / 1000) 417 418 async def handle_room_events(events: Iterable[EventBase]) -> None: 419 with Measure(self.clock, "handle_room_events"): 420 for event in events: 421 await handle_event(event) 422 423 events_by_room: Dict[str, List[EventBase]] = {} 424 for event in events: 425 events_by_room.setdefault(event.room_id, []).append(event) 426 427 await make_deferred_yieldable( 428 defer.gatherResults( 429 [ 430 run_in_background(handle_room_events, evs) 431 for evs in events_by_room.values() 432 ], 433 consumeErrors=True, 434 ) 435 ) 436 437 await self.store.update_federation_out_pos("events", next_token) 438 439 if events: 440 now = self.clock.time_msec() 441 ts = await self.store.get_received_ts(events[-1].event_id) 442 assert ts is not None 443 444 synapse.metrics.event_processing_lag.labels( 445 "federation_sender" 446 ).set(now - ts) 447 synapse.metrics.event_processing_last_ts.labels( 448 "federation_sender" 449 ).set(ts) 450 451 events_processed_counter.inc(len(events)) 452 453 event_processing_loop_room_count.labels("federation_sender").inc( 454 len(events_by_room) 455 ) 456 457 event_processing_loop_counter.labels("federation_sender").inc() 458 459 synapse.metrics.event_processing_positions.labels( 460 "federation_sender" 461 ).set(next_token) 462 463 finally: 464 self._is_processing = False 465 466 async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: 467 # We loop through all destinations to see whether we already have 468 # a transaction in progress. If we do, stick it in the pending_pdus 469 # table and we'll get back to it later. 470 471 destinations = set(destinations) 472 destinations.discard(self.server_name) 473 logger.debug("Sending to: %s", str(destinations)) 474 475 if not destinations: 476 return 477 478 sent_pdus_destination_dist_total.inc(len(destinations)) 479 sent_pdus_destination_dist_count.inc() 480 481 assert pdu.internal_metadata.stream_ordering 482 483 # track the fact that we have a PDU for these destinations, 484 # to allow us to perform catch-up later on if the remote is unreachable 485 # for a while. 486 await self.store.store_destination_rooms_entries( 487 destinations, 488 pdu.room_id, 489 pdu.internal_metadata.stream_ordering, 490 ) 491 492 for destination in destinations: 493 self._get_per_destination_queue(destination).send_pdu(pdu) 494 495 async def send_read_receipt(self, receipt: ReadReceipt) -> None: 496 """Send a RR to any other servers in the room 497 498 Args: 499 receipt: receipt to be sent 500 """ 501 502 # Some background on the rate-limiting going on here. 503 # 504 # It turns out that if we attempt to send out RRs as soon as we get them from 505 # a client, then we end up trying to do several hundred Hz of federation 506 # transactions. (The number of transactions scales as O(N^2) on the size of a 507 # room, since in a large room we have both more RRs coming in, and more servers 508 # to send them to.) 509 # 510 # This leads to a lot of CPU load, and we end up getting behind. The solution 511 # currently adopted is as follows: 512 # 513 # The first receipt in a given room is sent out immediately, at time T0. Any 514 # further receipts are, in theory, batched up for N seconds, where N is calculated 515 # based on the number of servers in the room to achieve a transaction frequency 516 # of around 50Hz. So, for example, if there were 100 servers in the room, then 517 # N would be 100 / 50Hz = 2 seconds. 518 # 519 # Then, after T+N, we flush out any receipts that have accumulated, and restart 520 # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate, 521 # we stop the cycle and go back to the start. 522 # 523 # However, in practice, it is often possible to flush out receipts earlier: in 524 # particular, if we are sending a transaction to a given server anyway (for 525 # example, because we have a PDU or a RR in another room to send), then we may 526 # as well send out all of the pending RRs for that server. So it may be that 527 # by the time we get to T+N, we don't actually have any RRs left to send out. 528 # Nevertheless we continue to buffer up RRs for the room in question until we 529 # reach the point that no RRs arrive between timer ticks. 530 # 531 # For even more background, see https://github.com/matrix-org/synapse/issues/4730. 532 533 room_id = receipt.room_id 534 535 # Work out which remote servers should be poked and poke them. 536 domains_set = await self.state.get_current_hosts_in_room(room_id) 537 domains = [ 538 d 539 for d in domains_set 540 if d != self.server_name 541 and self._federation_shard_config.should_handle(self._instance_name, d) 542 ] 543 if not domains: 544 return 545 546 queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) 547 548 # if there is no flush yet scheduled, we will send out these receipts with 549 # immediate flushes, and schedule the next flush for this room. 550 if queues_pending_flush is not None: 551 logger.debug("Queuing receipt for: %r", domains) 552 else: 553 logger.debug("Sending receipt to: %r", domains) 554 self._schedule_rr_flush_for_room(room_id, len(domains)) 555 556 for domain in domains: 557 queue = self._get_per_destination_queue(domain) 558 queue.queue_read_receipt(receipt) 559 560 # if there is already a RR flush pending for this room, then make sure this 561 # destination is registered for the flush 562 if queues_pending_flush is not None: 563 queues_pending_flush.add(queue) 564 else: 565 queue.flush_read_receipts_for_room(room_id) 566 567 def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: 568 # that is going to cause approximately len(domains) transactions, so now back 569 # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM 570 backoff_ms = self._rr_txn_interval_per_room_ms * n_domains 571 572 logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms) 573 self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) 574 self._queues_awaiting_rr_flush_by_room[room_id] = set() 575 576 def _flush_rrs_for_room(self, room_id: str) -> None: 577 queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) 578 logger.debug("Flushing RRs in %s to %s", room_id, queues) 579 580 if not queues: 581 # no more RRs arrived for this room; we are done. 582 return 583 584 # schedule the next flush 585 self._schedule_rr_flush_for_room(room_id, len(queues)) 586 587 for queue in queues: 588 queue.flush_read_receipts_for_room(room_id) 589 590 def send_presence_to_destinations( 591 self, states: Iterable[UserPresenceState], destinations: Iterable[str] 592 ) -> None: 593 """Send the given presence states to the given destinations. 594 destinations (list[str]) 595 """ 596 597 if not states or not self.hs.config.server.use_presence: 598 # No-op if presence is disabled. 599 return 600 601 # Ensure we only send out presence states for local users. 602 for state in states: 603 assert self.is_mine_id(state.user_id) 604 605 for destination in destinations: 606 if destination == self.server_name: 607 continue 608 if not self._federation_shard_config.should_handle( 609 self._instance_name, destination 610 ): 611 continue 612 613 self._get_per_destination_queue(destination).send_presence( 614 states, start_loop=False 615 ) 616 617 self._presence_queue.add_to_queue(destination) 618 619 def build_and_send_edu( 620 self, 621 destination: str, 622 edu_type: str, 623 content: JsonDict, 624 key: Optional[Hashable] = None, 625 ) -> None: 626 """Construct an Edu object, and queue it for sending 627 628 Args: 629 destination: name of server to send to 630 edu_type: type of EDU to send 631 content: content of EDU 632 key: clobbering key for this edu 633 """ 634 if destination == self.server_name: 635 logger.info("Not sending EDU to ourselves") 636 return 637 638 if not self._federation_shard_config.should_handle( 639 self._instance_name, destination 640 ): 641 return 642 643 edu = Edu( 644 origin=self.server_name, 645 destination=destination, 646 edu_type=edu_type, 647 content=content, 648 ) 649 650 self.send_edu(edu, key) 651 652 def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None: 653 """Queue an EDU for sending 654 655 Args: 656 edu: edu to send 657 key: clobbering key for this edu 658 """ 659 if not self._federation_shard_config.should_handle( 660 self._instance_name, edu.destination 661 ): 662 return 663 664 queue = self._get_per_destination_queue(edu.destination) 665 if key: 666 queue.send_keyed_edu(edu, key) 667 else: 668 queue.send_edu(edu) 669 670 def send_device_messages(self, destination: str) -> None: 671 if destination == self.server_name: 672 logger.warning("Not sending device update to ourselves") 673 return 674 675 if not self._federation_shard_config.should_handle( 676 self._instance_name, destination 677 ): 678 return 679 680 self._get_per_destination_queue(destination).attempt_new_transaction() 681 682 def wake_destination(self, destination: str) -> None: 683 """Called when we want to retry sending transactions to a remote. 684 685 This is mainly useful if the remote server has been down and we think it 686 might have come back. 687 """ 688 689 if destination == self.server_name: 690 logger.warning("Not waking up ourselves") 691 return 692 693 if not self._federation_shard_config.should_handle( 694 self._instance_name, destination 695 ): 696 return 697 698 self._get_per_destination_queue(destination).attempt_new_transaction() 699 700 @staticmethod 701 def get_current_token() -> int: 702 # Dummy implementation for case where federation sender isn't offloaded 703 # to a worker. 704 return 0 705 706 def federation_ack(self, instance_name: str, token: int) -> None: 707 # It is not expected that this gets called on FederationSender. 708 raise NotImplementedError() 709 710 @staticmethod 711 async def get_replication_rows( 712 instance_name: str, from_token: int, to_token: int, target_row_count: int 713 ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: 714 # Dummy implementation for case where federation sender isn't offloaded 715 # to a worker. 716 return [], 0, False 717 718 async def _wake_destinations_needing_catchup(self) -> None: 719 """ 720 Wakes up destinations that need catch-up and are not currently being 721 backed off from. 722 723 In order to reduce load spikes, adds a delay between each destination. 724 """ 725 726 last_processed: Optional[str] = None 727 728 while True: 729 destinations_to_wake = ( 730 await self.store.get_catch_up_outstanding_destinations(last_processed) 731 ) 732 733 if not destinations_to_wake: 734 # finished waking all destinations! 735 self._catchup_after_startup_timer = None 736 break 737 738 last_processed = destinations_to_wake[-1] 739 740 destinations_to_wake = [ 741 d 742 for d in destinations_to_wake 743 if self._federation_shard_config.should_handle(self._instance_name, d) 744 ] 745 746 for destination in destinations_to_wake: 747 logger.info( 748 "Destination %s has outstanding catch-up, waking up.", 749 last_processed, 750 ) 751 self.wake_destination(destination) 752 await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) 753