1# Copyright 2014-2016 OpenMarket Ltd 2# Copyright 2020 The Matrix.org Foundation C.I.C. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""This module is responsible for keeping track of presence status of local 17and remote users. 18 19The methods that define policy are: 20 - PresenceHandler._update_states 21 - PresenceHandler._handle_timeouts 22 - should_notify 23""" 24import abc 25import contextlib 26import logging 27from bisect import bisect 28from contextlib import contextmanager 29from types import TracebackType 30from typing import ( 31 TYPE_CHECKING, 32 Any, 33 Awaitable, 34 Callable, 35 Collection, 36 Dict, 37 FrozenSet, 38 Generator, 39 Iterable, 40 List, 41 Optional, 42 Set, 43 Tuple, 44 Type, 45 Union, 46) 47 48from prometheus_client import Counter 49from typing_extensions import ContextManager 50 51import synapse.metrics 52from synapse.api.constants import EventTypes, Membership, PresenceState 53from synapse.api.errors import SynapseError 54from synapse.api.presence import UserPresenceState 55from synapse.appservice import ApplicationService 56from synapse.events.presence_router import PresenceRouter 57from synapse.logging.context import run_in_background 58from synapse.logging.utils import log_function 59from synapse.metrics import LaterGauge 60from synapse.metrics.background_process_metrics import run_as_background_process 61from synapse.replication.http.presence import ( 62 ReplicationBumpPresenceActiveTime, 63 ReplicationPresenceSetState, 64) 65from synapse.replication.http.streams import ReplicationGetStreamUpdates 66from synapse.replication.tcp.commands import ClearUserSyncsCommand 67from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream 68from synapse.storage.databases.main import DataStore 69from synapse.streams import EventSource 70from synapse.types import JsonDict, UserID, get_domain_from_id 71from synapse.util.async_helpers import Linearizer 72from synapse.util.caches.descriptors import _CacheContext, cached 73from synapse.util.metrics import Measure 74from synapse.util.wheel_timer import WheelTimer 75 76if TYPE_CHECKING: 77 from synapse.server import HomeServer 78 79logger = logging.getLogger(__name__) 80 81 82notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") 83federation_presence_out_counter = Counter( 84 "synapse_handler_presence_federation_presence_out", "" 85) 86presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") 87timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") 88federation_presence_counter = Counter( 89 "synapse_handler_presence_federation_presence", "" 90) 91bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") 92 93get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) 94 95notify_reason_counter = Counter( 96 "synapse_handler_presence_notify_reason", "", ["reason"] 97) 98state_transition_counter = Counter( 99 "synapse_handler_presence_state_transition", "", ["from", "to"] 100) 101 102 103# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them 104# "currently_active" 105LAST_ACTIVE_GRANULARITY = 60 * 1000 106 107# How long to wait until a new /events or /sync request before assuming 108# the client has gone. 109SYNC_ONLINE_TIMEOUT = 30 * 1000 110 111# How long to wait before marking the user as idle. Compared against last active 112IDLE_TIMER = 5 * 60 * 1000 113 114# How often we expect remote servers to resend us presence. 115FEDERATION_TIMEOUT = 30 * 60 * 1000 116 117# How often to resend presence to remote servers 118FEDERATION_PING_INTERVAL = 25 * 60 * 1000 119 120# How long we will wait before assuming that the syncs from an external process 121# are dead. 122EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 123 124# Delay before a worker tells the presence handler that a user has stopped 125# syncing. 126UPDATE_SYNCING_USERS_MS = 10 * 1000 127 128assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER 129 130 131class BasePresenceHandler(abc.ABC): 132 """Parts of the PresenceHandler that are shared between workers and presence 133 writer""" 134 135 def __init__(self, hs: "HomeServer"): 136 self.clock = hs.get_clock() 137 self.store = hs.get_datastore() 138 self.presence_router = hs.get_presence_router() 139 self.state = hs.get_state_handler() 140 self.is_mine_id = hs.is_mine_id 141 142 self._federation = None 143 if hs.should_send_federation(): 144 self._federation = hs.get_federation_sender() 145 146 self._federation_queue = PresenceFederationQueue(hs, self) 147 148 self._busy_presence_enabled = hs.config.experimental.msc3026_enabled 149 150 active_presence = self.store.take_presence_startup_info() 151 self.user_to_current_state = {state.user_id: state for state in active_presence} 152 153 @abc.abstractmethod 154 async def user_syncing( 155 self, user_id: str, affect_presence: bool 156 ) -> ContextManager[None]: 157 """Returns a context manager that should surround any stream requests 158 from the user. 159 160 This allows us to keep track of who is currently streaming and who isn't 161 without having to have timers outside of this module to avoid flickering 162 when users disconnect/reconnect. 163 164 Args: 165 user_id: the user that is starting a sync 166 affect_presence: If false this function will be a no-op. 167 Useful for streams that are not associated with an actual 168 client that is being used by a user. 169 """ 170 171 @abc.abstractmethod 172 def get_currently_syncing_users_for_replication(self) -> Iterable[str]: 173 """Get an iterable of syncing users on this worker, to send to the presence handler 174 175 This is called when a replication connection is established. It should return 176 a list of user ids, which are then sent as USER_SYNC commands to inform the 177 process handling presence about those users. 178 179 Returns: 180 An iterable of user_id strings. 181 """ 182 183 async def get_state(self, target_user: UserID) -> UserPresenceState: 184 results = await self.get_states([target_user.to_string()]) 185 return results[0] 186 187 async def get_states( 188 self, target_user_ids: Iterable[str] 189 ) -> List[UserPresenceState]: 190 """Get the presence state for users.""" 191 192 updates_d = await self.current_state_for_users(target_user_ids) 193 updates = list(updates_d.values()) 194 195 for user_id in set(target_user_ids) - {u.user_id for u in updates}: 196 updates.append(UserPresenceState.default(user_id)) 197 198 return updates 199 200 async def current_state_for_users( 201 self, user_ids: Iterable[str] 202 ) -> Dict[str, UserPresenceState]: 203 """Get the current presence state for multiple users. 204 205 Returns: 206 dict: `user_id` -> `UserPresenceState` 207 """ 208 states = { 209 user_id: self.user_to_current_state.get(user_id, None) 210 for user_id in user_ids 211 } 212 213 missing = [user_id for user_id, state in states.items() if not state] 214 if missing: 215 # There are things not in our in memory cache. Lets pull them out of 216 # the database. 217 res = await self.store.get_presence_for_users(missing) 218 states.update(res) 219 220 missing = [user_id for user_id, state in states.items() if not state] 221 if missing: 222 new = { 223 user_id: UserPresenceState.default(user_id) for user_id in missing 224 } 225 states.update(new) 226 self.user_to_current_state.update(new) 227 228 return states 229 230 @abc.abstractmethod 231 async def set_state( 232 self, 233 target_user: UserID, 234 state: JsonDict, 235 ignore_status_msg: bool = False, 236 force_notify: bool = False, 237 ) -> None: 238 """Set the presence state of the user. 239 240 Args: 241 target_user: The ID of the user to set the presence state of. 242 state: The presence state as a JSON dictionary. 243 ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. 244 If False, the user's current status will be updated. 245 force_notify: Whether to force notification of the update to clients. 246 """ 247 248 @abc.abstractmethod 249 async def bump_presence_active_time(self, user: UserID) -> None: 250 """We've seen the user do something that indicates they're interacting 251 with the app. 252 """ 253 254 async def update_external_syncs_row( 255 self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int 256 ) -> None: 257 """Update the syncing users for an external process as a delta. 258 259 This is a no-op when presence is handled by a different worker. 260 261 Args: 262 process_id: An identifier for the process the users are 263 syncing against. This allows synapse to process updates 264 as user start and stop syncing against a given process. 265 user_id: The user who has started or stopped syncing 266 is_syncing: Whether or not the user is now syncing 267 sync_time_msec: Time in ms when the user was last syncing 268 """ 269 pass 270 271 async def update_external_syncs_clear(self, process_id: str) -> None: 272 """Marks all users that had been marked as syncing by a given process 273 as offline. 274 275 Used when the process has stopped/disappeared. 276 277 This is a no-op when presence is handled by a different worker. 278 """ 279 pass 280 281 async def process_replication_rows( 282 self, stream_name: str, instance_name: str, token: int, rows: list 283 ) -> None: 284 """Process streams received over replication.""" 285 await self._federation_queue.process_replication_rows( 286 stream_name, instance_name, token, rows 287 ) 288 289 def get_federation_queue(self) -> "PresenceFederationQueue": 290 """Get the presence federation queue.""" 291 return self._federation_queue 292 293 async def maybe_send_presence_to_interested_destinations( 294 self, states: List[UserPresenceState] 295 ) -> None: 296 """If this instance is a federation sender, send the states to all 297 destinations that are interested. Filters out any states for remote 298 users. 299 """ 300 301 if not self._federation: 302 return 303 304 states = [s for s in states if self.is_mine_id(s.user_id)] 305 306 if not states: 307 return 308 309 hosts_to_states = await get_interested_remotes( 310 self.store, 311 self.presence_router, 312 states, 313 ) 314 315 for destination, host_states in hosts_to_states.items(): 316 self._federation.send_presence_to_destinations(host_states, [destination]) 317 318 async def send_full_presence_to_users(self, user_ids: Collection[str]) -> None: 319 """ 320 Adds to the list of users who should receive a full snapshot of presence 321 upon their next sync. Note that this only works for local users. 322 323 Then, grabs the current presence state for a given set of users and adds it 324 to the top of the presence stream. 325 326 Args: 327 user_ids: The IDs of the local users to send full presence to. 328 """ 329 # Retrieve one of the users from the given set 330 if not user_ids: 331 raise Exception( 332 "send_full_presence_to_users must be called with at least one user" 333 ) 334 user_id = next(iter(user_ids)) 335 336 # Mark all users as receiving full presence on their next sync 337 await self.store.add_users_to_send_full_presence_to(user_ids) 338 339 # Add a new entry to the presence stream. Since we use stream tokens to determine whether a 340 # local user should receive a full snapshot of presence when they sync, we need to bump the 341 # presence stream so that subsequent syncs with no presence activity in between won't result 342 # in the client receiving multiple full snapshots of presence. 343 # 344 # If we bump the stream ID, then the user will get a higher stream token next sync, and thus 345 # correctly won't receive a second snapshot. 346 347 # Get the current presence state for one of the users (defaults to offline if not found) 348 current_presence_state = await self.get_state(UserID.from_string(user_id)) 349 350 # Convert the UserPresenceState object into a serializable dict 351 state = { 352 "presence": current_presence_state.state, 353 "status_message": current_presence_state.status_msg, 354 } 355 356 # Copy the presence state to the tip of the presence stream. 357 358 # We set force_notify=True here so that this presence update is guaranteed to 359 # increment the presence stream ID (which resending the current user's presence 360 # otherwise would not do). 361 await self.set_state(UserID.from_string(user_id), state, force_notify=True) 362 363 async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: 364 raise NotImplementedError( 365 "Attempting to check presence on a non-presence worker." 366 ) 367 368 369class _NullContextManager(ContextManager[None]): 370 """A context manager which does nothing.""" 371 372 def __exit__( 373 self, 374 exc_type: Optional[Type[BaseException]], 375 exc_val: Optional[BaseException], 376 exc_tb: Optional[TracebackType], 377 ) -> None: 378 pass 379 380 381class WorkerPresenceHandler(BasePresenceHandler): 382 def __init__(self, hs: "HomeServer"): 383 super().__init__(hs) 384 self.hs = hs 385 386 self._presence_writer_instance = hs.config.worker.writers.presence[0] 387 388 self._presence_enabled = hs.config.server.use_presence 389 390 # Route presence EDUs to the right worker 391 hs.get_federation_registry().register_instances_for_edu( 392 "m.presence", 393 hs.config.worker.writers.presence, 394 ) 395 396 # The number of ongoing syncs on this process, by user id. 397 # Empty if _presence_enabled is false. 398 self._user_to_num_current_syncs: Dict[str, int] = {} 399 400 self.notifier = hs.get_notifier() 401 self.instance_id = hs.get_instance_id() 402 403 # user_id -> last_sync_ms. Lists the users that have stopped syncing but 404 # we haven't notified the presence writer of that yet 405 self.users_going_offline: Dict[str, int] = {} 406 407 self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) 408 self._set_state_client = ReplicationPresenceSetState.make_client(hs) 409 410 self._send_stop_syncing_loop = self.clock.looping_call( 411 self.send_stop_syncing, UPDATE_SYNCING_USERS_MS 412 ) 413 414 self._busy_presence_enabled = hs.config.experimental.msc3026_enabled 415 416 hs.get_reactor().addSystemEventTrigger( 417 "before", 418 "shutdown", 419 run_as_background_process, 420 "generic_presence.on_shutdown", 421 self._on_shutdown, 422 ) 423 424 async def _on_shutdown(self) -> None: 425 if self._presence_enabled: 426 self.hs.get_tcp_replication().send_command( 427 ClearUserSyncsCommand(self.instance_id) 428 ) 429 430 def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: 431 if self._presence_enabled: 432 self.hs.get_tcp_replication().send_user_sync( 433 self.instance_id, user_id, is_syncing, last_sync_ms 434 ) 435 436 def mark_as_coming_online(self, user_id: str) -> None: 437 """A user has started syncing. Send a UserSync to the presence writer, 438 unless they had recently stopped syncing. 439 """ 440 going_offline = self.users_going_offline.pop(user_id, None) 441 if not going_offline: 442 # Safe to skip because we haven't yet told the presence writer they 443 # were offline 444 self.send_user_sync(user_id, True, self.clock.time_msec()) 445 446 def mark_as_going_offline(self, user_id: str) -> None: 447 """A user has stopped syncing. We wait before notifying the presence 448 writer as its likely they'll come back soon. This allows us to avoid 449 sending a stopped syncing immediately followed by a started syncing 450 notification to the presence writer 451 """ 452 self.users_going_offline[user_id] = self.clock.time_msec() 453 454 def send_stop_syncing(self) -> None: 455 """Check if there are any users who have stopped syncing a while ago and 456 haven't come back yet. If there are poke the presence writer about them. 457 """ 458 now = self.clock.time_msec() 459 for user_id, last_sync_ms in list(self.users_going_offline.items()): 460 if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: 461 self.users_going_offline.pop(user_id, None) 462 self.send_user_sync(user_id, False, last_sync_ms) 463 464 async def user_syncing( 465 self, user_id: str, affect_presence: bool 466 ) -> ContextManager[None]: 467 """Record that a user is syncing. 468 469 Called by the sync and events servlets to record that a user has connected to 470 this worker and is waiting for some events. 471 """ 472 if not affect_presence or not self._presence_enabled: 473 return _NullContextManager() 474 475 curr_sync = self._user_to_num_current_syncs.get(user_id, 0) 476 self._user_to_num_current_syncs[user_id] = curr_sync + 1 477 478 # If we went from no in flight sync to some, notify replication 479 if self._user_to_num_current_syncs[user_id] == 1: 480 self.mark_as_coming_online(user_id) 481 482 def _end() -> None: 483 # We check that the user_id is in user_to_num_current_syncs because 484 # user_to_num_current_syncs may have been cleared if we are 485 # shutting down. 486 if user_id in self._user_to_num_current_syncs: 487 self._user_to_num_current_syncs[user_id] -= 1 488 489 # If we went from one in flight sync to non, notify replication 490 if self._user_to_num_current_syncs[user_id] == 0: 491 self.mark_as_going_offline(user_id) 492 493 @contextlib.contextmanager 494 def _user_syncing() -> Generator[None, None, None]: 495 try: 496 yield 497 finally: 498 _end() 499 500 return _user_syncing() 501 502 async def notify_from_replication( 503 self, states: List[UserPresenceState], stream_id: int 504 ) -> None: 505 parties = await get_interested_parties(self.store, self.presence_router, states) 506 room_ids_to_states, users_to_states = parties 507 508 self.notifier.on_new_event( 509 "presence_key", 510 stream_id, 511 rooms=room_ids_to_states.keys(), 512 users=users_to_states.keys(), 513 ) 514 515 async def process_replication_rows( 516 self, stream_name: str, instance_name: str, token: int, rows: list 517 ) -> None: 518 await super().process_replication_rows(stream_name, instance_name, token, rows) 519 520 if stream_name != PresenceStream.NAME: 521 return 522 523 states = [ 524 UserPresenceState( 525 row.user_id, 526 row.state, 527 row.last_active_ts, 528 row.last_federation_update_ts, 529 row.last_user_sync_ts, 530 row.status_msg, 531 row.currently_active, 532 ) 533 for row in rows 534 ] 535 536 # The list of states to notify sync streams and remote servers about. 537 # This is calculated by comparing the old and new states for each user 538 # using `should_notify(..)`. 539 # 540 # Note that this is necessary as the presence writer will periodically 541 # flush presence state changes that should not be notified about to the 542 # DB, and so will be sent over the replication stream. 543 state_to_notify = [] 544 545 for new_state in states: 546 old_state = self.user_to_current_state.get(new_state.user_id) 547 self.user_to_current_state[new_state.user_id] = new_state 548 549 if not old_state or should_notify(old_state, new_state): 550 state_to_notify.append(new_state) 551 552 stream_id = token 553 await self.notify_from_replication(state_to_notify, stream_id) 554 555 # If this is a federation sender, notify about presence updates. 556 await self.maybe_send_presence_to_interested_destinations(state_to_notify) 557 558 def get_currently_syncing_users_for_replication(self) -> Iterable[str]: 559 return [ 560 user_id 561 for user_id, count in self._user_to_num_current_syncs.items() 562 if count > 0 563 ] 564 565 async def set_state( 566 self, 567 target_user: UserID, 568 state: JsonDict, 569 ignore_status_msg: bool = False, 570 force_notify: bool = False, 571 ) -> None: 572 """Set the presence state of the user. 573 574 Args: 575 target_user: The ID of the user to set the presence state of. 576 state: The presence state as a JSON dictionary. 577 ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. 578 If False, the user's current status will be updated. 579 force_notify: Whether to force notification of the update to clients. 580 """ 581 presence = state["presence"] 582 583 valid_presence = ( 584 PresenceState.ONLINE, 585 PresenceState.UNAVAILABLE, 586 PresenceState.OFFLINE, 587 PresenceState.BUSY, 588 ) 589 590 if presence not in valid_presence or ( 591 presence == PresenceState.BUSY and not self._busy_presence_enabled 592 ): 593 raise SynapseError(400, "Invalid presence state") 594 595 user_id = target_user.to_string() 596 597 # If presence is disabled, no-op 598 if not self.hs.config.server.use_presence: 599 return 600 601 # Proxy request to instance that writes presence 602 await self._set_state_client( 603 instance_name=self._presence_writer_instance, 604 user_id=user_id, 605 state=state, 606 ignore_status_msg=ignore_status_msg, 607 force_notify=force_notify, 608 ) 609 610 async def bump_presence_active_time(self, user: UserID) -> None: 611 """We've seen the user do something that indicates they're interacting 612 with the app. 613 """ 614 # If presence is disabled, no-op 615 if not self.hs.config.server.use_presence: 616 return 617 618 # Proxy request to instance that writes presence 619 user_id = user.to_string() 620 await self._bump_active_client( 621 instance_name=self._presence_writer_instance, user_id=user_id 622 ) 623 624 625class PresenceHandler(BasePresenceHandler): 626 def __init__(self, hs: "HomeServer"): 627 super().__init__(hs) 628 self.hs = hs 629 self.server_name = hs.hostname 630 self.wheel_timer: WheelTimer[str] = WheelTimer() 631 self.notifier = hs.get_notifier() 632 self._presence_enabled = hs.config.server.use_presence 633 634 federation_registry = hs.get_federation_registry() 635 636 federation_registry.register_edu_handler("m.presence", self.incoming_presence) 637 638 LaterGauge( 639 "synapse_handlers_presence_user_to_current_state_size", 640 "", 641 [], 642 lambda: len(self.user_to_current_state), 643 ) 644 645 now = self.clock.time_msec() 646 for state in self.user_to_current_state.values(): 647 self.wheel_timer.insert( 648 now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER 649 ) 650 self.wheel_timer.insert( 651 now=now, 652 obj=state.user_id, 653 then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, 654 ) 655 if self.is_mine_id(state.user_id): 656 self.wheel_timer.insert( 657 now=now, 658 obj=state.user_id, 659 then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL, 660 ) 661 else: 662 self.wheel_timer.insert( 663 now=now, 664 obj=state.user_id, 665 then=state.last_federation_update_ts + FEDERATION_TIMEOUT, 666 ) 667 668 # Set of users who have presence in the `user_to_current_state` that 669 # have not yet been persisted 670 self.unpersisted_users_changes: Set[str] = set() 671 672 hs.get_reactor().addSystemEventTrigger( 673 "before", 674 "shutdown", 675 run_as_background_process, 676 "presence.on_shutdown", 677 self._on_shutdown, 678 ) 679 680 self._next_serial = 1 681 682 # Keeps track of the number of *ongoing* syncs on this process. While 683 # this is non zero a user will never go offline. 684 self.user_to_num_current_syncs: Dict[str, int] = {} 685 686 # Keeps track of the number of *ongoing* syncs on other processes. 687 # While any sync is ongoing on another process the user will never 688 # go offline. 689 # Each process has a unique identifier and an update frequency. If 690 # no update is received from that process within the update period then 691 # we assume that all the sync requests on that process have stopped. 692 # Stored as a dict from process_id to set of user_id, and a dict of 693 # process_id to millisecond timestamp last updated. 694 self.external_process_to_current_syncs: Dict[str, Set[str]] = {} 695 self.external_process_last_updated_ms: Dict[str, int] = {} 696 697 self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") 698 699 if self._presence_enabled: 700 # Start a LoopingCall in 30s that fires every 5s. 701 # The initial delay is to allow disconnected clients a chance to 702 # reconnect before we treat them as offline. 703 def run_timeout_handler() -> Awaitable[None]: 704 return run_as_background_process( 705 "handle_presence_timeouts", self._handle_timeouts 706 ) 707 708 self.clock.call_later( 709 30, self.clock.looping_call, run_timeout_handler, 5000 710 ) 711 712 def run_persister() -> Awaitable[None]: 713 return run_as_background_process( 714 "persist_presence_changes", self._persist_unpersisted_changes 715 ) 716 717 self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) 718 719 LaterGauge( 720 "synapse_handlers_presence_wheel_timer_size", 721 "", 722 [], 723 lambda: len(self.wheel_timer), 724 ) 725 726 # Used to handle sending of presence to newly joined users/servers 727 if self._presence_enabled: 728 self.notifier.add_replication_callback(self.notify_new_event) 729 730 # Presence is best effort and quickly heals itself, so lets just always 731 # stream from the current state when we restart. 732 self._event_pos = self.store.get_room_max_stream_ordering() 733 self._event_processing = False 734 735 async def _on_shutdown(self) -> None: 736 """Gets called when shutting down. This lets us persist any updates that 737 we haven't yet persisted, e.g. updates that only changes some internal 738 timers. This allows changes to persist across startup without having to 739 persist every single change. 740 741 If this does not run it simply means that some of the timers will fire 742 earlier than they should when synapse is restarted. This affect of this 743 is some spurious presence changes that will self-correct. 744 """ 745 # If the DB pool has already terminated, don't try updating 746 if not self.store.db_pool.is_running(): 747 return 748 749 logger.info( 750 "Performing _on_shutdown. Persisting %d unpersisted changes", 751 len(self.user_to_current_state), 752 ) 753 754 if self.unpersisted_users_changes: 755 756 await self.store.update_presence( 757 [ 758 self.user_to_current_state[user_id] 759 for user_id in self.unpersisted_users_changes 760 ] 761 ) 762 logger.info("Finished _on_shutdown") 763 764 async def _persist_unpersisted_changes(self) -> None: 765 """We periodically persist the unpersisted changes, as otherwise they 766 may stack up and slow down shutdown times. 767 """ 768 unpersisted = self.unpersisted_users_changes 769 self.unpersisted_users_changes = set() 770 771 if unpersisted: 772 logger.info("Persisting %d unpersisted presence updates", len(unpersisted)) 773 await self.store.update_presence( 774 [self.user_to_current_state[user_id] for user_id in unpersisted] 775 ) 776 777 async def _update_states( 778 self, new_states: Iterable[UserPresenceState], force_notify: bool = False 779 ) -> None: 780 """Updates presence of users. Sets the appropriate timeouts. Pokes 781 the notifier and federation if and only if the changed presence state 782 should be sent to clients/servers. 783 784 Args: 785 new_states: The new user presence state updates to process. 786 force_notify: Whether to force notifying clients of this presence state update, 787 even if it doesn't change the state of a user's presence (e.g online -> online). 788 This is currently used to bump the max presence stream ID without changing any 789 user's presence (see PresenceHandler.add_users_to_send_full_presence_to). 790 """ 791 now = self.clock.time_msec() 792 793 with Measure(self.clock, "presence_update_states"): 794 795 # NOTE: We purposefully don't await between now and when we've 796 # calculated what we want to do with the new states, to avoid races. 797 798 to_notify = {} # Changes we want to notify everyone about 799 to_federation_ping = {} # These need sending keep-alives 800 801 # Only bother handling the last presence change for each user 802 new_states_dict = {} 803 for new_state in new_states: 804 new_states_dict[new_state.user_id] = new_state 805 new_states = new_states_dict.values() 806 807 for new_state in new_states: 808 user_id = new_state.user_id 809 810 # Its fine to not hit the database here, as the only thing not in 811 # the current state cache are OFFLINE states, where the only field 812 # of interest is last_active which is safe enough to assume is 0 813 # here. 814 prev_state = self.user_to_current_state.get( 815 user_id, UserPresenceState.default(user_id) 816 ) 817 818 new_state, should_notify, should_ping = handle_update( 819 prev_state, 820 new_state, 821 is_mine=self.is_mine_id(user_id), 822 wheel_timer=self.wheel_timer, 823 now=now, 824 ) 825 826 if force_notify: 827 should_notify = True 828 829 self.user_to_current_state[user_id] = new_state 830 831 if should_notify: 832 to_notify[user_id] = new_state 833 elif should_ping: 834 to_federation_ping[user_id] = new_state 835 836 # TODO: We should probably ensure there are no races hereafter 837 838 presence_updates_counter.inc(len(new_states)) 839 840 if to_notify: 841 notified_presence_counter.inc(len(to_notify)) 842 await self._persist_and_notify(list(to_notify.values())) 843 844 self.unpersisted_users_changes |= {s.user_id for s in new_states} 845 self.unpersisted_users_changes -= set(to_notify.keys()) 846 847 # Check if we need to resend any presence states to remote hosts. We 848 # only do this for states that haven't been updated in a while to 849 # ensure that the remote host doesn't time the presence state out. 850 # 851 # Note that since these are states that have *not* been updated, 852 # they won't get sent down the normal presence replication stream, 853 # and so we have to explicitly send them via the federation stream. 854 to_federation_ping = { 855 user_id: state 856 for user_id, state in to_federation_ping.items() 857 if user_id not in to_notify 858 } 859 if to_federation_ping: 860 federation_presence_out_counter.inc(len(to_federation_ping)) 861 862 hosts_to_states = await get_interested_remotes( 863 self.store, 864 self.presence_router, 865 list(to_federation_ping.values()), 866 ) 867 868 for destination, states in hosts_to_states.items(): 869 self._federation_queue.send_presence_to_destinations( 870 states, [destination] 871 ) 872 873 async def _handle_timeouts(self) -> None: 874 """Checks the presence of users that have timed out and updates as 875 appropriate. 876 """ 877 logger.debug("Handling presence timeouts") 878 now = self.clock.time_msec() 879 880 # Fetch the list of users that *may* have timed out. Things may have 881 # changed since the timeout was set, so we won't necessarily have to 882 # take any action. 883 users_to_check = set(self.wheel_timer.fetch(now)) 884 885 # Check whether the lists of syncing processes from an external 886 # process have expired. 887 expired_process_ids = [ 888 process_id 889 for process_id, last_update in self.external_process_last_updated_ms.items() 890 if now - last_update > EXTERNAL_PROCESS_EXPIRY 891 ] 892 for process_id in expired_process_ids: 893 # For each expired process drop tracking info and check the users 894 # that were syncing on that process to see if they need to be timed 895 # out. 896 users_to_check.update( 897 self.external_process_to_current_syncs.pop(process_id, ()) 898 ) 899 self.external_process_last_updated_ms.pop(process_id) 900 901 states = [ 902 self.user_to_current_state.get(user_id, UserPresenceState.default(user_id)) 903 for user_id in users_to_check 904 ] 905 906 timers_fired_counter.inc(len(states)) 907 908 syncing_user_ids = { 909 user_id 910 for user_id, count in self.user_to_num_current_syncs.items() 911 if count 912 } 913 for user_ids in self.external_process_to_current_syncs.values(): 914 syncing_user_ids.update(user_ids) 915 916 changes = handle_timeouts( 917 states, 918 is_mine_fn=self.is_mine_id, 919 syncing_user_ids=syncing_user_ids, 920 now=now, 921 ) 922 923 return await self._update_states(changes) 924 925 async def bump_presence_active_time(self, user: UserID) -> None: 926 """We've seen the user do something that indicates they're interacting 927 with the app. 928 """ 929 # If presence is disabled, no-op 930 if not self.hs.config.server.use_presence: 931 return 932 933 user_id = user.to_string() 934 935 bump_active_time_counter.inc() 936 937 prev_state = await self.current_state_for_user(user_id) 938 939 new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} 940 if prev_state.state == PresenceState.UNAVAILABLE: 941 new_fields["state"] = PresenceState.ONLINE 942 943 await self._update_states([prev_state.copy_and_replace(**new_fields)]) 944 945 async def user_syncing( 946 self, user_id: str, affect_presence: bool = True 947 ) -> ContextManager[None]: 948 """Returns a context manager that should surround any stream requests 949 from the user. 950 951 This allows us to keep track of who is currently streaming and who isn't 952 without having to have timers outside of this module to avoid flickering 953 when users disconnect/reconnect. 954 955 Args: 956 user_id 957 affect_presence: If false this function will be a no-op. 958 Useful for streams that are not associated with an actual 959 client that is being used by a user. 960 """ 961 # Override if it should affect the user's presence, if presence is 962 # disabled. 963 if not self.hs.config.server.use_presence: 964 affect_presence = False 965 966 if affect_presence: 967 curr_sync = self.user_to_num_current_syncs.get(user_id, 0) 968 self.user_to_num_current_syncs[user_id] = curr_sync + 1 969 970 prev_state = await self.current_state_for_user(user_id) 971 if prev_state.state == PresenceState.OFFLINE: 972 # If they're currently offline then bring them online, otherwise 973 # just update the last sync times. 974 await self._update_states( 975 [ 976 prev_state.copy_and_replace( 977 state=PresenceState.ONLINE, 978 last_active_ts=self.clock.time_msec(), 979 last_user_sync_ts=self.clock.time_msec(), 980 ) 981 ] 982 ) 983 else: 984 await self._update_states( 985 [ 986 prev_state.copy_and_replace( 987 last_user_sync_ts=self.clock.time_msec() 988 ) 989 ] 990 ) 991 992 async def _end() -> None: 993 try: 994 self.user_to_num_current_syncs[user_id] -= 1 995 996 prev_state = await self.current_state_for_user(user_id) 997 await self._update_states( 998 [ 999 prev_state.copy_and_replace( 1000 last_user_sync_ts=self.clock.time_msec() 1001 ) 1002 ] 1003 ) 1004 except Exception: 1005 logger.exception("Error updating presence after sync") 1006 1007 @contextmanager 1008 def _user_syncing() -> Generator[None, None, None]: 1009 try: 1010 yield 1011 finally: 1012 if affect_presence: 1013 run_in_background(_end) 1014 1015 return _user_syncing() 1016 1017 def get_currently_syncing_users_for_replication(self) -> Iterable[str]: 1018 # since we are the process handling presence, there is nothing to do here. 1019 return [] 1020 1021 async def update_external_syncs_row( 1022 self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int 1023 ) -> None: 1024 """Update the syncing users for an external process as a delta. 1025 1026 Args: 1027 process_id: An identifier for the process the users are 1028 syncing against. This allows synapse to process updates 1029 as user start and stop syncing against a given process. 1030 user_id: The user who has started or stopped syncing 1031 is_syncing: Whether or not the user is now syncing 1032 sync_time_msec: Time in ms when the user was last syncing 1033 """ 1034 with (await self.external_sync_linearizer.queue(process_id)): 1035 prev_state = await self.current_state_for_user(user_id) 1036 1037 process_presence = self.external_process_to_current_syncs.setdefault( 1038 process_id, set() 1039 ) 1040 1041 updates = [] 1042 if is_syncing and user_id not in process_presence: 1043 if prev_state.state == PresenceState.OFFLINE: 1044 updates.append( 1045 prev_state.copy_and_replace( 1046 state=PresenceState.ONLINE, 1047 last_active_ts=sync_time_msec, 1048 last_user_sync_ts=sync_time_msec, 1049 ) 1050 ) 1051 else: 1052 updates.append( 1053 prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) 1054 ) 1055 process_presence.add(user_id) 1056 elif user_id in process_presence: 1057 updates.append( 1058 prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) 1059 ) 1060 1061 if not is_syncing: 1062 process_presence.discard(user_id) 1063 1064 if updates: 1065 await self._update_states(updates) 1066 1067 self.external_process_last_updated_ms[process_id] = self.clock.time_msec() 1068 1069 async def update_external_syncs_clear(self, process_id: str) -> None: 1070 """Marks all users that had been marked as syncing by a given process 1071 as offline. 1072 1073 Used when the process has stopped/disappeared. 1074 """ 1075 with (await self.external_sync_linearizer.queue(process_id)): 1076 process_presence = self.external_process_to_current_syncs.pop( 1077 process_id, set() 1078 ) 1079 prev_states = await self.current_state_for_users(process_presence) 1080 time_now_ms = self.clock.time_msec() 1081 1082 await self._update_states( 1083 [ 1084 prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) 1085 for prev_state in prev_states.values() 1086 ] 1087 ) 1088 self.external_process_last_updated_ms.pop(process_id, None) 1089 1090 async def current_state_for_user(self, user_id: str) -> UserPresenceState: 1091 """Get the current presence state for a user.""" 1092 res = await self.current_state_for_users([user_id]) 1093 return res[user_id] 1094 1095 async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: 1096 """Persist states in the database, poke the notifier and send to 1097 interested remote servers 1098 """ 1099 stream_id, max_token = await self.store.update_presence(states) 1100 1101 parties = await get_interested_parties(self.store, self.presence_router, states) 1102 room_ids_to_states, users_to_states = parties 1103 1104 self.notifier.on_new_event( 1105 "presence_key", 1106 stream_id, 1107 rooms=room_ids_to_states.keys(), 1108 users=[UserID.from_string(u) for u in users_to_states], 1109 ) 1110 1111 # We only want to poke the local federation sender, if any, as other 1112 # workers will receive the presence updates via the presence replication 1113 # stream (which is updated by `store.update_presence`). 1114 await self.maybe_send_presence_to_interested_destinations(states) 1115 1116 async def incoming_presence(self, origin: str, content: JsonDict) -> None: 1117 """Called when we receive a `m.presence` EDU from a remote server.""" 1118 if not self._presence_enabled: 1119 return 1120 1121 now = self.clock.time_msec() 1122 updates = [] 1123 for push in content.get("push", []): 1124 # A "push" contains a list of presence that we are probably interested 1125 # in. 1126 user_id = push.get("user_id", None) 1127 if not user_id: 1128 logger.info( 1129 "Got presence update from %r with no 'user_id': %r", origin, push 1130 ) 1131 continue 1132 1133 if get_domain_from_id(user_id) != origin: 1134 logger.info( 1135 "Got presence update from %r with bad 'user_id': %r", 1136 origin, 1137 user_id, 1138 ) 1139 continue 1140 1141 presence_state = push.get("presence", None) 1142 if not presence_state: 1143 logger.info( 1144 "Got presence update from %r with no 'presence_state': %r", 1145 origin, 1146 push, 1147 ) 1148 continue 1149 1150 new_fields = {"state": presence_state, "last_federation_update_ts": now} 1151 1152 last_active_ago = push.get("last_active_ago", None) 1153 if last_active_ago is not None: 1154 new_fields["last_active_ts"] = now - last_active_ago 1155 1156 new_fields["status_msg"] = push.get("status_msg", None) 1157 new_fields["currently_active"] = push.get("currently_active", False) 1158 1159 prev_state = await self.current_state_for_user(user_id) 1160 updates.append(prev_state.copy_and_replace(**new_fields)) 1161 1162 if updates: 1163 federation_presence_counter.inc(len(updates)) 1164 await self._update_states(updates) 1165 1166 async def set_state( 1167 self, 1168 target_user: UserID, 1169 state: JsonDict, 1170 ignore_status_msg: bool = False, 1171 force_notify: bool = False, 1172 ) -> None: 1173 """Set the presence state of the user. 1174 1175 Args: 1176 target_user: The ID of the user to set the presence state of. 1177 state: The presence state as a JSON dictionary. 1178 ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. 1179 If False, the user's current status will be updated. 1180 force_notify: Whether to force notification of the update to clients. 1181 """ 1182 status_msg = state.get("status_msg", None) 1183 presence = state["presence"] 1184 1185 valid_presence = ( 1186 PresenceState.ONLINE, 1187 PresenceState.UNAVAILABLE, 1188 PresenceState.OFFLINE, 1189 PresenceState.BUSY, 1190 ) 1191 1192 if presence not in valid_presence or ( 1193 presence == PresenceState.BUSY and not self._busy_presence_enabled 1194 ): 1195 raise SynapseError(400, "Invalid presence state") 1196 1197 user_id = target_user.to_string() 1198 1199 prev_state = await self.current_state_for_user(user_id) 1200 1201 new_fields = {"state": presence} 1202 1203 if not ignore_status_msg: 1204 new_fields["status_msg"] = status_msg 1205 1206 if presence == PresenceState.ONLINE or ( 1207 presence == PresenceState.BUSY and self._busy_presence_enabled 1208 ): 1209 new_fields["last_active_ts"] = self.clock.time_msec() 1210 1211 await self._update_states( 1212 [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify 1213 ) 1214 1215 async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: 1216 """Returns whether a user can see another user's presence.""" 1217 observer_room_ids = await self.store.get_rooms_for_user( 1218 observer_user.to_string() 1219 ) 1220 observed_room_ids = await self.store.get_rooms_for_user( 1221 observed_user.to_string() 1222 ) 1223 1224 if observer_room_ids & observed_room_ids: 1225 return True 1226 1227 return False 1228 1229 async def get_all_presence_updates( 1230 self, instance_name: str, last_id: int, current_id: int, limit: int 1231 ) -> Tuple[List[Tuple[int, list]], int, bool]: 1232 """ 1233 Gets a list of presence update rows from between the given stream ids. 1234 Each row has: 1235 - stream_id(str) 1236 - user_id(str) 1237 - state(str) 1238 - last_active_ts(int) 1239 - last_federation_update_ts(int) 1240 - last_user_sync_ts(int) 1241 - status_msg(int) 1242 - currently_active(int) 1243 1244 Args: 1245 instance_name: The writer we want to fetch updates from. Unused 1246 here since there is only ever one writer. 1247 last_id: The token to fetch updates from. Exclusive. 1248 current_id: The token to fetch updates up to. Inclusive. 1249 limit: The requested limit for the number of rows to return. The 1250 function may return more or fewer rows. 1251 1252 Returns: 1253 A tuple consisting of: the updates, a token to use to fetch 1254 subsequent updates, and whether we returned fewer rows than exists 1255 between the requested tokens due to the limit. 1256 1257 The token returned can be used in a subsequent call to this 1258 function to get further updates. 1259 1260 The updates are a list of 2-tuples of stream ID and the row data 1261 """ 1262 1263 # TODO(markjh): replicate the unpersisted changes. 1264 # This could use the in-memory stores for recent changes. 1265 rows = await self.store.get_all_presence_updates( 1266 instance_name, last_id, current_id, limit 1267 ) 1268 return rows 1269 1270 def notify_new_event(self) -> None: 1271 """Called when new events have happened. Handles users and servers 1272 joining rooms and require being sent presence. 1273 """ 1274 1275 if self._event_processing: 1276 return 1277 1278 async def _process_presence() -> None: 1279 assert not self._event_processing 1280 1281 self._event_processing = True 1282 try: 1283 await self._unsafe_process() 1284 finally: 1285 self._event_processing = False 1286 1287 run_as_background_process("presence.notify_new_event", _process_presence) 1288 1289 async def _unsafe_process(self) -> None: 1290 # Loop round handling deltas until we're up to date 1291 while True: 1292 with Measure(self.clock, "presence_delta"): 1293 room_max_stream_ordering = self.store.get_room_max_stream_ordering() 1294 if self._event_pos == room_max_stream_ordering: 1295 return 1296 1297 logger.debug( 1298 "Processing presence stats %s->%s", 1299 self._event_pos, 1300 room_max_stream_ordering, 1301 ) 1302 max_pos, deltas = await self.store.get_current_state_deltas( 1303 self._event_pos, room_max_stream_ordering 1304 ) 1305 1306 # We may get multiple deltas for different rooms, but we want to 1307 # handle them on a room by room basis, so we batch them up by 1308 # room. 1309 deltas_by_room: Dict[str, List[JsonDict]] = {} 1310 for delta in deltas: 1311 deltas_by_room.setdefault(delta["room_id"], []).append(delta) 1312 1313 for room_id, deltas_for_room in deltas_by_room.items(): 1314 await self._handle_state_delta(room_id, deltas_for_room) 1315 1316 self._event_pos = max_pos 1317 1318 # Expose current event processing position to prometheus 1319 synapse.metrics.event_processing_positions.labels("presence").set( 1320 max_pos 1321 ) 1322 1323 async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: 1324 """Process current state deltas for the room to find new joins that need 1325 to be handled. 1326 """ 1327 1328 # Sets of newly joined users. Note that if the local server is 1329 # joining a remote room for the first time we'll see both the joining 1330 # user and all remote users as newly joined. 1331 newly_joined_users = set() 1332 1333 for delta in deltas: 1334 assert room_id == delta["room_id"] 1335 1336 typ = delta["type"] 1337 state_key = delta["state_key"] 1338 event_id = delta["event_id"] 1339 prev_event_id = delta["prev_event_id"] 1340 1341 logger.debug("Handling: %r %r, %s", typ, state_key, event_id) 1342 1343 # Drop any event that isn't a membership join 1344 if typ != EventTypes.Member: 1345 continue 1346 1347 if event_id is None: 1348 # state has been deleted, so this is not a join. We only care about 1349 # joins. 1350 continue 1351 1352 event = await self.store.get_event(event_id, allow_none=True) 1353 if not event or event.content.get("membership") != Membership.JOIN: 1354 # We only care about joins 1355 continue 1356 1357 if prev_event_id: 1358 prev_event = await self.store.get_event(prev_event_id, allow_none=True) 1359 if ( 1360 prev_event 1361 and prev_event.content.get("membership") == Membership.JOIN 1362 ): 1363 # Ignore changes to join events. 1364 continue 1365 1366 newly_joined_users.add(state_key) 1367 1368 if not newly_joined_users: 1369 # If nobody has joined then there's nothing to do. 1370 return 1371 1372 # We want to send: 1373 # 1. presence states of all local users in the room to newly joined 1374 # remote servers 1375 # 2. presence states of newly joined users to all remote servers in 1376 # the room. 1377 # 1378 # TODO: Only send presence states to remote hosts that don't already 1379 # have them (because they already share rooms). 1380 1381 # Get all the users who were already in the room, by fetching the 1382 # current users in the room and removing the newly joined users. 1383 users = await self.store.get_users_in_room(room_id) 1384 prev_users = set(users) - newly_joined_users 1385 1386 # Construct sets for all the local users and remote hosts that were 1387 # already in the room 1388 prev_local_users = [] 1389 prev_remote_hosts = set() 1390 for user_id in prev_users: 1391 if self.is_mine_id(user_id): 1392 prev_local_users.append(user_id) 1393 else: 1394 prev_remote_hosts.add(get_domain_from_id(user_id)) 1395 1396 # Similarly, construct sets for all the local users and remote hosts 1397 # that were *not* already in the room. Care needs to be taken with the 1398 # calculating the remote hosts, as a host may have already been in the 1399 # room even if there is a newly joined user from that host. 1400 newly_joined_local_users = [] 1401 newly_joined_remote_hosts = set() 1402 for user_id in newly_joined_users: 1403 if self.is_mine_id(user_id): 1404 newly_joined_local_users.append(user_id) 1405 else: 1406 host = get_domain_from_id(user_id) 1407 if host not in prev_remote_hosts: 1408 newly_joined_remote_hosts.add(host) 1409 1410 # Send presence states of all local users in the room to newly joined 1411 # remote servers. (We actually only send states for local users already 1412 # in the room, as we'll send states for newly joined local users below.) 1413 if prev_local_users and newly_joined_remote_hosts: 1414 local_states = await self.current_state_for_users(prev_local_users) 1415 1416 # Filter out old presence, i.e. offline presence states where 1417 # the user hasn't been active for a week. We can change this 1418 # depending on what we want the UX to be, but at the least we 1419 # should filter out offline presence where the state is just the 1420 # default state. 1421 now = self.clock.time_msec() 1422 states = [ 1423 state 1424 for state in local_states.values() 1425 if state.state != PresenceState.OFFLINE 1426 or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 1427 or state.status_msg is not None 1428 ] 1429 1430 self._federation_queue.send_presence_to_destinations( 1431 destinations=newly_joined_remote_hosts, 1432 states=states, 1433 ) 1434 1435 # Send presence states of newly joined users to all remote servers in 1436 # the room 1437 if newly_joined_local_users and ( 1438 prev_remote_hosts or newly_joined_remote_hosts 1439 ): 1440 local_states = await self.current_state_for_users(newly_joined_local_users) 1441 self._federation_queue.send_presence_to_destinations( 1442 destinations=prev_remote_hosts | newly_joined_remote_hosts, 1443 states=list(local_states.values()), 1444 ) 1445 1446 1447def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: 1448 """Decides if a presence state change should be sent to interested parties.""" 1449 if old_state == new_state: 1450 return False 1451 1452 if old_state.status_msg != new_state.status_msg: 1453 notify_reason_counter.labels("status_msg_change").inc() 1454 return True 1455 1456 if old_state.state != new_state.state: 1457 notify_reason_counter.labels("state_change").inc() 1458 state_transition_counter.labels(old_state.state, new_state.state).inc() 1459 return True 1460 1461 if old_state.state == PresenceState.ONLINE: 1462 if new_state.currently_active != old_state.currently_active: 1463 notify_reason_counter.labels("current_active_change").inc() 1464 return True 1465 1466 if ( 1467 new_state.last_active_ts - old_state.last_active_ts 1468 > LAST_ACTIVE_GRANULARITY 1469 ): 1470 # Only notify about last active bumps if we're not currently active 1471 if not new_state.currently_active: 1472 notify_reason_counter.labels("last_active_change_online").inc() 1473 return True 1474 1475 elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: 1476 # Always notify for a transition where last active gets bumped. 1477 notify_reason_counter.labels("last_active_change_not_online").inc() 1478 return True 1479 1480 return False 1481 1482 1483def format_user_presence_state( 1484 state: UserPresenceState, now: int, include_user_id: bool = True 1485) -> JsonDict: 1486 """Convert UserPresenceState to a JSON format that can be sent down to clients 1487 and to other servers. 1488 1489 Args: 1490 state: The user presence state to format. 1491 now: The current timestamp since the epoch in ms. 1492 include_user_id: Whether to include `user_id` in the returned dictionary. 1493 As this function can be used both to format presence updates for client /sync 1494 responses and for federation /send requests, only the latter needs the include 1495 the `user_id` field. 1496 1497 Returns: 1498 A JSON dictionary with the following keys: 1499 * presence: The presence state as a str. 1500 * user_id: Optional. Included if `include_user_id` is truthy. The canonical 1501 Matrix ID of the user. 1502 * last_active_ago: Optional. Included if `last_active_ts` is set on `state`. 1503 The timestamp that the user was last active. 1504 * status_msg: Optional. Included if `status_msg` is set on `state`. The user's 1505 status. 1506 * currently_active: Optional. Included only if `state.state` is "online". 1507 1508 Example: 1509 1510 { 1511 "presence": "online", 1512 "user_id": "@alice:example.com", 1513 "last_active_ago": 16783813918, 1514 "status_msg": "Hello world!", 1515 "currently_active": True 1516 } 1517 """ 1518 content: JsonDict = {"presence": state.state} 1519 if include_user_id: 1520 content["user_id"] = state.user_id 1521 if state.last_active_ts: 1522 content["last_active_ago"] = now - state.last_active_ts 1523 if state.status_msg: 1524 content["status_msg"] = state.status_msg 1525 if state.state == PresenceState.ONLINE: 1526 content["currently_active"] = state.currently_active 1527 1528 return content 1529 1530 1531class PresenceEventSource(EventSource[int, UserPresenceState]): 1532 def __init__(self, hs: "HomeServer"): 1533 # We can't call get_presence_handler here because there's a cycle: 1534 # 1535 # Presence -> Notifier -> PresenceEventSource -> Presence 1536 # 1537 # Same with get_presence_router: 1538 # 1539 # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler 1540 self.get_presence_handler = hs.get_presence_handler 1541 self.get_presence_router = hs.get_presence_router 1542 self.clock = hs.get_clock() 1543 self.store = hs.get_datastore() 1544 1545 @log_function 1546 async def get_new_events( 1547 self, 1548 user: UserID, 1549 from_key: Optional[int], 1550 limit: Optional[int] = None, 1551 room_ids: Optional[Collection[str]] = None, 1552 is_guest: bool = False, 1553 explicit_room_id: Optional[str] = None, 1554 include_offline: bool = True, 1555 service: Optional[ApplicationService] = None, 1556 ) -> Tuple[List[UserPresenceState], int]: 1557 # The process for getting presence events are: 1558 # 1. Get the rooms the user is in. 1559 # 2. Get the list of user in the rooms. 1560 # 3. Get the list of users that are in the user's presence list. 1561 # 4. If there is a from_key set, cross reference the list of users 1562 # with the `presence_stream_cache` to see which ones we actually 1563 # need to check. 1564 # 5. Load current state for the users. 1565 # 1566 # We don't try and limit the presence updates by the current token, as 1567 # sending down the rare duplicate is not a concern. 1568 1569 user_id = user.to_string() 1570 stream_change_cache = self.store.presence_stream_cache 1571 1572 with Measure(self.clock, "presence.get_new_events"): 1573 if from_key is not None: 1574 from_key = int(from_key) 1575 1576 # Check if this user should receive all current, online user presence. We only 1577 # bother to do this if from_key is set, as otherwise the user will receive all 1578 # user presence anyways. 1579 if await self.store.should_user_receive_full_presence_with_token( 1580 user_id, from_key 1581 ): 1582 # This user has been specified by a module to receive all current, online 1583 # user presence. Removing from_key and setting include_offline to false 1584 # will do effectively this. 1585 from_key = None 1586 include_offline = False 1587 1588 max_token = self.store.get_current_presence_token() 1589 if from_key == max_token: 1590 # This is necessary as due to the way stream ID generators work 1591 # we may get updates that have a stream ID greater than the max 1592 # token (e.g. max_token is N but stream generator may return 1593 # results for N+2, due to N+1 not having finished being 1594 # persisted yet). 1595 # 1596 # This is usually fine, as it just means that we may send down 1597 # some presence updates multiple times. However, we need to be 1598 # careful that the sync stream either actually does make some 1599 # progress or doesn't return, otherwise clients will end up 1600 # tight looping calling /sync due to it immediately returning 1601 # the same token repeatedly. 1602 # 1603 # Hence this guard where we just return nothing so that the sync 1604 # doesn't return. C.f. #5503. 1605 return [], max_token 1606 1607 # Figure out which other users this user should receive updates for 1608 users_interested_in = await self._get_interested_in(user, explicit_room_id) 1609 1610 # We have a set of users that we're interested in the presence of. We want to 1611 # cross-reference that with the users that have actually changed their presence. 1612 1613 # Check whether this user should see all user updates 1614 1615 if users_interested_in == PresenceRouter.ALL_USERS: 1616 # Provide presence state for all users 1617 presence_updates = await self._filter_all_presence_updates_for_user( 1618 user_id, include_offline, from_key 1619 ) 1620 1621 return presence_updates, max_token 1622 1623 # Make mypy happy. users_interested_in should now be a set 1624 assert not isinstance(users_interested_in, str) 1625 1626 # The set of users that we're interested in and that have had a presence update. 1627 # We'll actually pull the presence updates for these users at the end. 1628 interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set() 1629 1630 if from_key: 1631 # First get all users that have had a presence update 1632 updated_users = stream_change_cache.get_all_entities_changed(from_key) 1633 1634 # Cross-reference users we're interested in with those that have had updates. 1635 # Use a slightly-optimised method for processing smaller sets of updates. 1636 if updated_users is not None and len(updated_users) < 500: 1637 # For small deltas, it's quicker to get all changes and then 1638 # cross-reference with the users we're interested in 1639 get_updates_counter.labels("stream").inc() 1640 for other_user_id in updated_users: 1641 if other_user_id in users_interested_in: 1642 # mypy thinks this variable could be a FrozenSet as it's possibly set 1643 # to one in the `get_entities_changed` call below, and `add()` is not 1644 # method on a FrozenSet. That doesn't affect us here though, as 1645 # `interested_and_updated_users` is clearly a set() above. 1646 interested_and_updated_users.add(other_user_id) # type: ignore 1647 else: 1648 # Too many possible updates. Find all users we can see and check 1649 # if any of them have changed. 1650 get_updates_counter.labels("full").inc() 1651 1652 interested_and_updated_users = ( 1653 stream_change_cache.get_entities_changed( 1654 users_interested_in, from_key 1655 ) 1656 ) 1657 else: 1658 # No from_key has been specified. Return the presence for all users 1659 # this user is interested in 1660 interested_and_updated_users = users_interested_in 1661 1662 # Retrieve the current presence state for each user 1663 users_to_state = await self.get_presence_handler().current_state_for_users( 1664 interested_and_updated_users 1665 ) 1666 presence_updates = list(users_to_state.values()) 1667 1668 if not include_offline: 1669 # Filter out offline presence states 1670 presence_updates = self._filter_offline_presence_state(presence_updates) 1671 1672 return presence_updates, max_token 1673 1674 async def _filter_all_presence_updates_for_user( 1675 self, 1676 user_id: str, 1677 include_offline: bool, 1678 from_key: Optional[int] = None, 1679 ) -> List[UserPresenceState]: 1680 """ 1681 Computes the presence updates a user should receive. 1682 1683 First pulls presence updates from the database. Then consults PresenceRouter 1684 for whether any updates should be excluded by user ID. 1685 1686 Args: 1687 user_id: The User ID of the user to compute presence updates for. 1688 include_offline: Whether to include offline presence states from the results. 1689 from_key: The minimum stream ID of updates to pull from the database 1690 before filtering. 1691 1692 Returns: 1693 A list of presence states for the given user to receive. 1694 """ 1695 if from_key: 1696 # Only return updates since the last sync 1697 updated_users = self.store.presence_stream_cache.get_all_entities_changed( 1698 from_key 1699 ) 1700 if not updated_users: 1701 updated_users = [] 1702 1703 # Get the actual presence update for each change 1704 users_to_state = await self.get_presence_handler().current_state_for_users( 1705 updated_users 1706 ) 1707 presence_updates = list(users_to_state.values()) 1708 1709 if not include_offline: 1710 # Filter out offline states 1711 presence_updates = self._filter_offline_presence_state(presence_updates) 1712 else: 1713 users_to_state = await self.store.get_presence_for_all_users( 1714 include_offline=include_offline 1715 ) 1716 1717 presence_updates = list(users_to_state.values()) 1718 1719 # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the 1720 # module for information on a number of users when we then only take the info 1721 # for a single user 1722 1723 # Filter through the presence router 1724 users_to_state_set = await self.get_presence_router().get_users_for_states( 1725 presence_updates 1726 ) 1727 1728 # We only want the mapping for the syncing user 1729 presence_updates = list(users_to_state_set[user_id]) 1730 1731 # Return presence information for all users 1732 return presence_updates 1733 1734 def _filter_offline_presence_state( 1735 self, presence_updates: Iterable[UserPresenceState] 1736 ) -> List[UserPresenceState]: 1737 """Given an iterable containing user presence updates, return a list with any offline 1738 presence states removed. 1739 1740 Args: 1741 presence_updates: Presence states to filter 1742 1743 Returns: 1744 A new list with any offline presence states removed. 1745 """ 1746 return [ 1747 update 1748 for update in presence_updates 1749 if update.state != PresenceState.OFFLINE 1750 ] 1751 1752 def get_current_key(self) -> int: 1753 return self.store.get_current_presence_token() 1754 1755 @cached(num_args=2, cache_context=True) 1756 async def _get_interested_in( 1757 self, 1758 user: UserID, 1759 explicit_room_id: Optional[str] = None, 1760 cache_context: Optional[_CacheContext] = None, 1761 ) -> Union[Set[str], str]: 1762 """Returns the set of users that the given user should see presence 1763 updates for. 1764 1765 Args: 1766 user: The user to retrieve presence updates for. 1767 explicit_room_id: The users that are in the room will be returned. 1768 1769 Returns: 1770 A set of user IDs to return presence updates for, or "ALL" to return all 1771 known updates. 1772 """ 1773 user_id = user.to_string() 1774 users_interested_in = set() 1775 users_interested_in.add(user_id) # So that we receive our own presence 1776 1777 # cache_context isn't likely to ever be None due to the @cached decorator, 1778 # but we can't have a non-optional argument after the optional argument 1779 # explicit_room_id either. Assert cache_context is not None so we can use it 1780 # without mypy complaining. 1781 assert cache_context 1782 1783 # Check with the presence router whether we should poll additional users for 1784 # their presence information 1785 additional_users = await self.get_presence_router().get_interested_users( 1786 user.to_string() 1787 ) 1788 if additional_users == PresenceRouter.ALL_USERS: 1789 # If the module requested that this user see the presence updates of *all* 1790 # users, then simply return that instead of calculating what rooms this 1791 # user shares 1792 return PresenceRouter.ALL_USERS 1793 1794 # Add the additional users from the router 1795 users_interested_in.update(additional_users) 1796 1797 # Find the users who share a room with this user 1798 users_who_share_room = await self.store.get_users_who_share_room_with_user( 1799 user_id, on_invalidate=cache_context.invalidate 1800 ) 1801 users_interested_in.update(users_who_share_room) 1802 1803 if explicit_room_id: 1804 user_ids = await self.store.get_users_in_room( 1805 explicit_room_id, on_invalidate=cache_context.invalidate 1806 ) 1807 users_interested_in.update(user_ids) 1808 1809 return users_interested_in 1810 1811 1812def handle_timeouts( 1813 user_states: List[UserPresenceState], 1814 is_mine_fn: Callable[[str], bool], 1815 syncing_user_ids: Set[str], 1816 now: int, 1817) -> List[UserPresenceState]: 1818 """Checks the presence of users that have timed out and updates as 1819 appropriate. 1820 1821 Args: 1822 user_states: List of UserPresenceState's to check. 1823 is_mine_fn: Function that returns if a user_id is ours 1824 syncing_user_ids: Set of user_ids with active syncs. 1825 now: Current time in ms. 1826 1827 Returns: 1828 List of UserPresenceState updates 1829 """ 1830 changes = {} # Actual changes we need to notify people about 1831 1832 for state in user_states: 1833 is_mine = is_mine_fn(state.user_id) 1834 1835 new_state = handle_timeout(state, is_mine, syncing_user_ids, now) 1836 if new_state: 1837 changes[state.user_id] = new_state 1838 1839 return list(changes.values()) 1840 1841 1842def handle_timeout( 1843 state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int 1844) -> Optional[UserPresenceState]: 1845 """Checks the presence of the user to see if any of the timers have elapsed 1846 1847 Args: 1848 state 1849 is_mine: Whether the user is ours 1850 syncing_user_ids: Set of user_ids with active syncs. 1851 now: Current time in ms. 1852 1853 Returns: 1854 A UserPresenceState update or None if no update. 1855 """ 1856 if state.state == PresenceState.OFFLINE: 1857 # No timeouts are associated with offline states. 1858 return None 1859 1860 changed = False 1861 user_id = state.user_id 1862 1863 if is_mine: 1864 if state.state == PresenceState.ONLINE: 1865 if now - state.last_active_ts > IDLE_TIMER: 1866 # Currently online, but last activity ages ago so auto 1867 # idle 1868 state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) 1869 changed = True 1870 elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: 1871 # So that we send down a notification that we've 1872 # stopped updating. 1873 changed = True 1874 1875 if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: 1876 # Need to send ping to other servers to ensure they don't 1877 # timeout and set us to offline 1878 changed = True 1879 1880 # If there are have been no sync for a while (and none ongoing), 1881 # set presence to offline 1882 if user_id not in syncing_user_ids: 1883 # If the user has done something recently but hasn't synced, 1884 # don't set them as offline. 1885 sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) 1886 if now - sync_or_active > SYNC_ONLINE_TIMEOUT: 1887 state = state.copy_and_replace(state=PresenceState.OFFLINE) 1888 changed = True 1889 else: 1890 # We expect to be poked occasionally by the other side. 1891 # This is to protect against forgetful/buggy servers, so that 1892 # no one gets stuck online forever. 1893 if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: 1894 # The other side seems to have disappeared. 1895 state = state.copy_and_replace(state=PresenceState.OFFLINE) 1896 changed = True 1897 1898 return state if changed else None 1899 1900 1901def handle_update( 1902 prev_state: UserPresenceState, 1903 new_state: UserPresenceState, 1904 is_mine: bool, 1905 wheel_timer: WheelTimer, 1906 now: int, 1907) -> Tuple[UserPresenceState, bool, bool]: 1908 """Given a presence update: 1909 1. Add any appropriate timers. 1910 2. Check if we should notify anyone. 1911 1912 Args: 1913 prev_state 1914 new_state 1915 is_mine: Whether the user is ours 1916 wheel_timer 1917 now: Time now in ms 1918 1919 Returns: 1920 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: 1921 - new_state: is the state to actually persist 1922 - persist_and_notify: whether to persist and notify people 1923 - federation_ping: whether we should send a ping over federation 1924 """ 1925 user_id = new_state.user_id 1926 1927 persist_and_notify = False 1928 federation_ping = False 1929 1930 # If the users are ours then we want to set up a bunch of timers 1931 # to time things out. 1932 if is_mine: 1933 if new_state.state == PresenceState.ONLINE: 1934 # Idle timer 1935 wheel_timer.insert( 1936 now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER 1937 ) 1938 1939 active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY 1940 new_state = new_state.copy_and_replace(currently_active=active) 1941 1942 if active: 1943 wheel_timer.insert( 1944 now=now, 1945 obj=user_id, 1946 then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY, 1947 ) 1948 1949 if new_state.state != PresenceState.OFFLINE: 1950 # User has stopped syncing 1951 wheel_timer.insert( 1952 now=now, 1953 obj=user_id, 1954 then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, 1955 ) 1956 1957 last_federate = new_state.last_federation_update_ts 1958 if now - last_federate > FEDERATION_PING_INTERVAL: 1959 # Been a while since we've poked remote servers 1960 new_state = new_state.copy_and_replace(last_federation_update_ts=now) 1961 federation_ping = True 1962 1963 else: 1964 wheel_timer.insert( 1965 now=now, 1966 obj=user_id, 1967 then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, 1968 ) 1969 1970 # Check whether the change was something worth notifying about 1971 if should_notify(prev_state, new_state): 1972 new_state = new_state.copy_and_replace(last_federation_update_ts=now) 1973 persist_and_notify = True 1974 1975 return new_state, persist_and_notify, federation_ping 1976 1977 1978async def get_interested_parties( 1979 store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] 1980) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: 1981 """Given a list of states return which entities (rooms, users) 1982 are interested in the given states. 1983 1984 Args: 1985 store: The homeserver's data store. 1986 presence_router: A module for augmenting the destinations for presence updates. 1987 states: A list of incoming user presence updates. 1988 1989 Returns: 1990 A 2-tuple of `(room_ids_to_states, users_to_states)`, 1991 with each item being a dict of `entity_name` -> `[UserPresenceState]` 1992 """ 1993 room_ids_to_states: Dict[str, List[UserPresenceState]] = {} 1994 users_to_states: Dict[str, List[UserPresenceState]] = {} 1995 for state in states: 1996 room_ids = await store.get_rooms_for_user(state.user_id) 1997 for room_id in room_ids: 1998 room_ids_to_states.setdefault(room_id, []).append(state) 1999 2000 # Always notify self 2001 users_to_states.setdefault(state.user_id, []).append(state) 2002 2003 # Ask a presence routing module for any additional parties if one 2004 # is loaded. 2005 router_users_to_states = await presence_router.get_users_for_states(states) 2006 2007 # Update the dictionaries with additional destinations and state to send 2008 for user_id, user_states in router_users_to_states.items(): 2009 users_to_states.setdefault(user_id, []).extend(user_states) 2010 2011 return room_ids_to_states, users_to_states 2012 2013 2014async def get_interested_remotes( 2015 store: DataStore, 2016 presence_router: PresenceRouter, 2017 states: List[UserPresenceState], 2018) -> Dict[str, Set[UserPresenceState]]: 2019 """Given a list of presence states figure out which remote servers 2020 should be sent which. 2021 2022 All the presence states should be for local users only. 2023 2024 Args: 2025 store: The homeserver's data store. 2026 presence_router: A module for augmenting the destinations for presence updates. 2027 states: A list of incoming user presence updates. 2028 2029 Returns: 2030 A map from destinations to presence states to send to that destination. 2031 """ 2032 hosts_and_states: Dict[str, Set[UserPresenceState]] = {} 2033 2034 # First we look up the rooms each user is in (as well as any explicit 2035 # subscriptions), then for each distinct room we look up the remote 2036 # hosts in those rooms. 2037 room_ids_to_states, users_to_states = await get_interested_parties( 2038 store, presence_router, states 2039 ) 2040 2041 for room_id, states in room_ids_to_states.items(): 2042 user_ids = await store.get_users_in_room(room_id) 2043 hosts = {get_domain_from_id(user_id) for user_id in user_ids} 2044 for host in hosts: 2045 hosts_and_states.setdefault(host, set()).update(states) 2046 2047 for user_id, states in users_to_states.items(): 2048 host = get_domain_from_id(user_id) 2049 hosts_and_states.setdefault(host, set()).update(states) 2050 2051 return hosts_and_states 2052 2053 2054class PresenceFederationQueue: 2055 """Handles sending ad hoc presence updates over federation, which are *not* 2056 due to state updates (that get handled via the presence stream), e.g. 2057 federation pings and sending existing present states to newly joined hosts. 2058 2059 Only the last N minutes will be queued, so if a federation sender instance 2060 is down for longer then some updates will be dropped. This is OK as presence 2061 is ephemeral, and so it will self correct eventually. 2062 2063 On workers the class tracks the last received position of the stream from 2064 replication, and handles querying for missed updates over HTTP replication, 2065 c.f. `get_current_token` and `get_replication_rows`. 2066 """ 2067 2068 # How long to keep entries in the queue for. Workers that are down for 2069 # longer than this duration will miss out on older updates. 2070 _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000 2071 2072 # How often to check if we can expire entries from the queue. 2073 _CLEAR_ITEMS_EVERY_MS = 60 * 1000 2074 2075 def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler): 2076 self._clock = hs.get_clock() 2077 self._notifier = hs.get_notifier() 2078 self._instance_name = hs.get_instance_name() 2079 self._presence_handler = presence_handler 2080 self._repl_client = ReplicationGetStreamUpdates.make_client(hs) 2081 2082 # Should we keep a queue of recent presence updates? We only bother if 2083 # another process may be handling federation sending. 2084 self._queue_presence_updates = True 2085 2086 # Whether this instance is a presence writer. 2087 self._presence_writer = self._instance_name in hs.config.worker.writers.presence 2088 2089 # The FederationSender instance, if this process sends federation traffic directly. 2090 self._federation = None 2091 2092 if hs.should_send_federation(): 2093 self._federation = hs.get_federation_sender() 2094 2095 # We don't bother queuing up presence states if only this instance 2096 # is sending federation. 2097 if hs.config.worker.federation_shard_config.instances == [ 2098 self._instance_name 2099 ]: 2100 self._queue_presence_updates = False 2101 2102 # The queue of recently queued updates as tuples of: `(timestamp, 2103 # stream_id, destinations, user_ids)`. We don't store the full states 2104 # for efficiency, and remote workers will already have the full states 2105 # cached. 2106 self._queue: List[Tuple[int, int, Collection[str], Set[str]]] = [] 2107 2108 self._next_id = 1 2109 2110 # Map from instance name to current token 2111 self._current_tokens: Dict[str, int] = {} 2112 2113 if self._queue_presence_updates: 2114 self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS) 2115 2116 def _clear_queue(self) -> None: 2117 """Clear out older entries from the queue.""" 2118 clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS 2119 2120 # The queue is sorted by timestamp, so we can bisect to find the right 2121 # place to purge before. Note that we are searching using a 1-tuple with 2122 # the time, which does The Right Thing since the queue is a tuple where 2123 # the first item is a timestamp. 2124 index = bisect(self._queue, (clear_before,)) 2125 self._queue = self._queue[index:] 2126 2127 def send_presence_to_destinations( 2128 self, states: Collection[UserPresenceState], destinations: Collection[str] 2129 ) -> None: 2130 """Send the presence states to the given destinations. 2131 2132 Will forward to the local federation sender (if there is one) and queue 2133 to send over replication (if there are other federation sender instances.). 2134 2135 Must only be called on the presence writer process. 2136 """ 2137 2138 # This should only be called on a presence writer. 2139 assert self._presence_writer 2140 2141 if self._federation: 2142 self._federation.send_presence_to_destinations( 2143 states=states, 2144 destinations=destinations, 2145 ) 2146 2147 if not self._queue_presence_updates: 2148 return 2149 2150 now = self._clock.time_msec() 2151 2152 stream_id = self._next_id 2153 self._next_id += 1 2154 2155 self._queue.append((now, stream_id, destinations, {s.user_id for s in states})) 2156 2157 self._notifier.notify_replication() 2158 2159 def get_current_token(self, instance_name: str) -> int: 2160 """Get the current position of the stream. 2161 2162 On workers this returns the last stream ID received from replication. 2163 """ 2164 if instance_name == self._instance_name: 2165 return self._next_id - 1 2166 else: 2167 return self._current_tokens.get(instance_name, 0) 2168 2169 async def get_replication_rows( 2170 self, 2171 instance_name: str, 2172 from_token: int, 2173 upto_token: int, 2174 target_row_count: int, 2175 ) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]: 2176 """Get all the updates between the two tokens. 2177 2178 We return rows in the form of `(destination, user_id)` to keep the size 2179 of each row bounded (rather than returning the sets in a row). 2180 2181 On workers this will query the presence writer process via HTTP replication. 2182 """ 2183 if instance_name != self._instance_name: 2184 # If not local we query over http replication from the presence 2185 # writer 2186 result = await self._repl_client( 2187 instance_name=instance_name, 2188 stream_name=PresenceFederationStream.NAME, 2189 from_token=from_token, 2190 upto_token=upto_token, 2191 ) 2192 return result["updates"], result["upto_token"], result["limited"] 2193 2194 # If the from_token is the current token then there's nothing to return 2195 # and we can trivially no-op. 2196 if from_token == self._next_id - 1: 2197 return [], upto_token, False 2198 2199 # We can find the correct position in the queue by noting that there is 2200 # exactly one entry per stream ID, and that the last entry has an ID of 2201 # `self._next_id - 1`, so we can count backwards from the end. 2202 # 2203 # Since we are returning all states in the range `from_token < stream_id 2204 # <= upto_token` we look for the index with a `stream_id` of `from_token 2205 # + 1`. 2206 # 2207 # Since the start of the queue is periodically truncated we need to 2208 # handle the case where `from_token` stream ID has already been dropped. 2209 start_idx = max(from_token + 1 - self._next_id, -len(self._queue)) 2210 2211 to_send: List[Tuple[int, Tuple[str, str]]] = [] 2212 limited = False 2213 new_id = upto_token 2214 for _, stream_id, destinations, user_ids in self._queue[start_idx:]: 2215 if stream_id <= from_token: 2216 # Paranoia check that we are actually only sending states that 2217 # are have stream_id strictly greater than from_token. We should 2218 # never hit this. 2219 logger.warning( 2220 "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)", 2221 stream_id, 2222 from_token, 2223 self._next_id, 2224 len(self._queue), 2225 ) 2226 continue 2227 2228 if stream_id > upto_token: 2229 break 2230 2231 new_id = stream_id 2232 2233 to_send.extend( 2234 (stream_id, (destination, user_id)) 2235 for destination in destinations 2236 for user_id in user_ids 2237 ) 2238 2239 if len(to_send) > target_row_count: 2240 limited = True 2241 break 2242 2243 return to_send, new_id, limited 2244 2245 async def process_replication_rows( 2246 self, stream_name: str, instance_name: str, token: int, rows: list 2247 ) -> None: 2248 if stream_name != PresenceFederationStream.NAME: 2249 return 2250 2251 # We keep track of the current tokens (so that we can catch up with anything we missed after a disconnect) 2252 self._current_tokens[instance_name] = token 2253 2254 # If we're a federation sender we pull out the presence states to send 2255 # and forward them on. 2256 if not self._federation: 2257 return 2258 2259 hosts_to_users: Dict[str, Set[str]] = {} 2260 for row in rows: 2261 hosts_to_users.setdefault(row.destination, set()).add(row.user_id) 2262 2263 for host, user_ids in hosts_to_users.items(): 2264 states = await self._presence_handler.current_state_for_users(user_ids) 2265 self._federation.send_presence_to_destinations( 2266 states=states.values(), 2267 destinations=[host], 2268 ) 2269