1# Copyright 2014-2016 OpenMarket Ltd
2# Copyright 2020 The Matrix.org Foundation C.I.C.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""This module is responsible for keeping track of presence status of local
17and remote users.
18
19The methods that define policy are:
20    - PresenceHandler._update_states
21    - PresenceHandler._handle_timeouts
22    - should_notify
23"""
24import abc
25import contextlib
26import logging
27from bisect import bisect
28from contextlib import contextmanager
29from types import TracebackType
30from typing import (
31    TYPE_CHECKING,
32    Any,
33    Awaitable,
34    Callable,
35    Collection,
36    Dict,
37    FrozenSet,
38    Generator,
39    Iterable,
40    List,
41    Optional,
42    Set,
43    Tuple,
44    Type,
45    Union,
46)
47
48from prometheus_client import Counter
49from typing_extensions import ContextManager
50
51import synapse.metrics
52from synapse.api.constants import EventTypes, Membership, PresenceState
53from synapse.api.errors import SynapseError
54from synapse.api.presence import UserPresenceState
55from synapse.appservice import ApplicationService
56from synapse.events.presence_router import PresenceRouter
57from synapse.logging.context import run_in_background
58from synapse.logging.utils import log_function
59from synapse.metrics import LaterGauge
60from synapse.metrics.background_process_metrics import run_as_background_process
61from synapse.replication.http.presence import (
62    ReplicationBumpPresenceActiveTime,
63    ReplicationPresenceSetState,
64)
65from synapse.replication.http.streams import ReplicationGetStreamUpdates
66from synapse.replication.tcp.commands import ClearUserSyncsCommand
67from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
68from synapse.storage.databases.main import DataStore
69from synapse.streams import EventSource
70from synapse.types import JsonDict, UserID, get_domain_from_id
71from synapse.util.async_helpers import Linearizer
72from synapse.util.caches.descriptors import _CacheContext, cached
73from synapse.util.metrics import Measure
74from synapse.util.wheel_timer import WheelTimer
75
76if TYPE_CHECKING:
77    from synapse.server import HomeServer
78
79logger = logging.getLogger(__name__)
80
81
82notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
83federation_presence_out_counter = Counter(
84    "synapse_handler_presence_federation_presence_out", ""
85)
86presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
87timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
88federation_presence_counter = Counter(
89    "synapse_handler_presence_federation_presence", ""
90)
91bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
92
93get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
94
95notify_reason_counter = Counter(
96    "synapse_handler_presence_notify_reason", "", ["reason"]
97)
98state_transition_counter = Counter(
99    "synapse_handler_presence_state_transition", "", ["from", "to"]
100)
101
102
103# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
104# "currently_active"
105LAST_ACTIVE_GRANULARITY = 60 * 1000
106
107# How long to wait until a new /events or /sync request before assuming
108# the client has gone.
109SYNC_ONLINE_TIMEOUT = 30 * 1000
110
111# How long to wait before marking the user as idle. Compared against last active
112IDLE_TIMER = 5 * 60 * 1000
113
114# How often we expect remote servers to resend us presence.
115FEDERATION_TIMEOUT = 30 * 60 * 1000
116
117# How often to resend presence to remote servers
118FEDERATION_PING_INTERVAL = 25 * 60 * 1000
119
120# How long we will wait before assuming that the syncs from an external process
121# are dead.
122EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
123
124# Delay before a worker tells the presence handler that a user has stopped
125# syncing.
126UPDATE_SYNCING_USERS_MS = 10 * 1000
127
128assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
129
130
131class BasePresenceHandler(abc.ABC):
132    """Parts of the PresenceHandler that are shared between workers and presence
133    writer"""
134
135    def __init__(self, hs: "HomeServer"):
136        self.clock = hs.get_clock()
137        self.store = hs.get_datastore()
138        self.presence_router = hs.get_presence_router()
139        self.state = hs.get_state_handler()
140        self.is_mine_id = hs.is_mine_id
141
142        self._federation = None
143        if hs.should_send_federation():
144            self._federation = hs.get_federation_sender()
145
146        self._federation_queue = PresenceFederationQueue(hs, self)
147
148        self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
149
150        active_presence = self.store.take_presence_startup_info()
151        self.user_to_current_state = {state.user_id: state for state in active_presence}
152
153    @abc.abstractmethod
154    async def user_syncing(
155        self, user_id: str, affect_presence: bool
156    ) -> ContextManager[None]:
157        """Returns a context manager that should surround any stream requests
158        from the user.
159
160        This allows us to keep track of who is currently streaming and who isn't
161        without having to have timers outside of this module to avoid flickering
162        when users disconnect/reconnect.
163
164        Args:
165            user_id: the user that is starting a sync
166            affect_presence: If false this function will be a no-op.
167                Useful for streams that are not associated with an actual
168                client that is being used by a user.
169        """
170
171    @abc.abstractmethod
172    def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
173        """Get an iterable of syncing users on this worker, to send to the presence handler
174
175        This is called when a replication connection is established. It should return
176        a list of user ids, which are then sent as USER_SYNC commands to inform the
177        process handling presence about those users.
178
179        Returns:
180            An iterable of user_id strings.
181        """
182
183    async def get_state(self, target_user: UserID) -> UserPresenceState:
184        results = await self.get_states([target_user.to_string()])
185        return results[0]
186
187    async def get_states(
188        self, target_user_ids: Iterable[str]
189    ) -> List[UserPresenceState]:
190        """Get the presence state for users."""
191
192        updates_d = await self.current_state_for_users(target_user_ids)
193        updates = list(updates_d.values())
194
195        for user_id in set(target_user_ids) - {u.user_id for u in updates}:
196            updates.append(UserPresenceState.default(user_id))
197
198        return updates
199
200    async def current_state_for_users(
201        self, user_ids: Iterable[str]
202    ) -> Dict[str, UserPresenceState]:
203        """Get the current presence state for multiple users.
204
205        Returns:
206            dict: `user_id` -> `UserPresenceState`
207        """
208        states = {
209            user_id: self.user_to_current_state.get(user_id, None)
210            for user_id in user_ids
211        }
212
213        missing = [user_id for user_id, state in states.items() if not state]
214        if missing:
215            # There are things not in our in memory cache. Lets pull them out of
216            # the database.
217            res = await self.store.get_presence_for_users(missing)
218            states.update(res)
219
220            missing = [user_id for user_id, state in states.items() if not state]
221            if missing:
222                new = {
223                    user_id: UserPresenceState.default(user_id) for user_id in missing
224                }
225                states.update(new)
226                self.user_to_current_state.update(new)
227
228        return states
229
230    @abc.abstractmethod
231    async def set_state(
232        self,
233        target_user: UserID,
234        state: JsonDict,
235        ignore_status_msg: bool = False,
236        force_notify: bool = False,
237    ) -> None:
238        """Set the presence state of the user.
239
240        Args:
241            target_user: The ID of the user to set the presence state of.
242            state: The presence state as a JSON dictionary.
243            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
244                If False, the user's current status will be updated.
245            force_notify: Whether to force notification of the update to clients.
246        """
247
248    @abc.abstractmethod
249    async def bump_presence_active_time(self, user: UserID) -> None:
250        """We've seen the user do something that indicates they're interacting
251        with the app.
252        """
253
254    async def update_external_syncs_row(
255        self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
256    ) -> None:
257        """Update the syncing users for an external process as a delta.
258
259        This is a no-op when presence is handled by a different worker.
260
261        Args:
262            process_id: An identifier for the process the users are
263                syncing against. This allows synapse to process updates
264                as user start and stop syncing against a given process.
265            user_id: The user who has started or stopped syncing
266            is_syncing: Whether or not the user is now syncing
267            sync_time_msec: Time in ms when the user was last syncing
268        """
269        pass
270
271    async def update_external_syncs_clear(self, process_id: str) -> None:
272        """Marks all users that had been marked as syncing by a given process
273        as offline.
274
275        Used when the process has stopped/disappeared.
276
277        This is a no-op when presence is handled by a different worker.
278        """
279        pass
280
281    async def process_replication_rows(
282        self, stream_name: str, instance_name: str, token: int, rows: list
283    ) -> None:
284        """Process streams received over replication."""
285        await self._federation_queue.process_replication_rows(
286            stream_name, instance_name, token, rows
287        )
288
289    def get_federation_queue(self) -> "PresenceFederationQueue":
290        """Get the presence federation queue."""
291        return self._federation_queue
292
293    async def maybe_send_presence_to_interested_destinations(
294        self, states: List[UserPresenceState]
295    ) -> None:
296        """If this instance is a federation sender, send the states to all
297        destinations that are interested. Filters out any states for remote
298        users.
299        """
300
301        if not self._federation:
302            return
303
304        states = [s for s in states if self.is_mine_id(s.user_id)]
305
306        if not states:
307            return
308
309        hosts_to_states = await get_interested_remotes(
310            self.store,
311            self.presence_router,
312            states,
313        )
314
315        for destination, host_states in hosts_to_states.items():
316            self._federation.send_presence_to_destinations(host_states, [destination])
317
318    async def send_full_presence_to_users(self, user_ids: Collection[str]) -> None:
319        """
320        Adds to the list of users who should receive a full snapshot of presence
321        upon their next sync. Note that this only works for local users.
322
323        Then, grabs the current presence state for a given set of users and adds it
324        to the top of the presence stream.
325
326        Args:
327            user_ids: The IDs of the local users to send full presence to.
328        """
329        # Retrieve one of the users from the given set
330        if not user_ids:
331            raise Exception(
332                "send_full_presence_to_users must be called with at least one user"
333            )
334        user_id = next(iter(user_ids))
335
336        # Mark all users as receiving full presence on their next sync
337        await self.store.add_users_to_send_full_presence_to(user_ids)
338
339        # Add a new entry to the presence stream. Since we use stream tokens to determine whether a
340        # local user should receive a full snapshot of presence when they sync, we need to bump the
341        # presence stream so that subsequent syncs with no presence activity in between won't result
342        # in the client receiving multiple full snapshots of presence.
343        #
344        # If we bump the stream ID, then the user will get a higher stream token next sync, and thus
345        # correctly won't receive a second snapshot.
346
347        # Get the current presence state for one of the users (defaults to offline if not found)
348        current_presence_state = await self.get_state(UserID.from_string(user_id))
349
350        # Convert the UserPresenceState object into a serializable dict
351        state = {
352            "presence": current_presence_state.state,
353            "status_message": current_presence_state.status_msg,
354        }
355
356        # Copy the presence state to the tip of the presence stream.
357
358        # We set force_notify=True here so that this presence update is guaranteed to
359        # increment the presence stream ID (which resending the current user's presence
360        # otherwise would not do).
361        await self.set_state(UserID.from_string(user_id), state, force_notify=True)
362
363    async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
364        raise NotImplementedError(
365            "Attempting to check presence on a non-presence worker."
366        )
367
368
369class _NullContextManager(ContextManager[None]):
370    """A context manager which does nothing."""
371
372    def __exit__(
373        self,
374        exc_type: Optional[Type[BaseException]],
375        exc_val: Optional[BaseException],
376        exc_tb: Optional[TracebackType],
377    ) -> None:
378        pass
379
380
381class WorkerPresenceHandler(BasePresenceHandler):
382    def __init__(self, hs: "HomeServer"):
383        super().__init__(hs)
384        self.hs = hs
385
386        self._presence_writer_instance = hs.config.worker.writers.presence[0]
387
388        self._presence_enabled = hs.config.server.use_presence
389
390        # Route presence EDUs to the right worker
391        hs.get_federation_registry().register_instances_for_edu(
392            "m.presence",
393            hs.config.worker.writers.presence,
394        )
395
396        # The number of ongoing syncs on this process, by user id.
397        # Empty if _presence_enabled is false.
398        self._user_to_num_current_syncs: Dict[str, int] = {}
399
400        self.notifier = hs.get_notifier()
401        self.instance_id = hs.get_instance_id()
402
403        # user_id -> last_sync_ms. Lists the users that have stopped syncing but
404        # we haven't notified the presence writer of that yet
405        self.users_going_offline: Dict[str, int] = {}
406
407        self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
408        self._set_state_client = ReplicationPresenceSetState.make_client(hs)
409
410        self._send_stop_syncing_loop = self.clock.looping_call(
411            self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
412        )
413
414        self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
415
416        hs.get_reactor().addSystemEventTrigger(
417            "before",
418            "shutdown",
419            run_as_background_process,
420            "generic_presence.on_shutdown",
421            self._on_shutdown,
422        )
423
424    async def _on_shutdown(self) -> None:
425        if self._presence_enabled:
426            self.hs.get_tcp_replication().send_command(
427                ClearUserSyncsCommand(self.instance_id)
428            )
429
430    def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
431        if self._presence_enabled:
432            self.hs.get_tcp_replication().send_user_sync(
433                self.instance_id, user_id, is_syncing, last_sync_ms
434            )
435
436    def mark_as_coming_online(self, user_id: str) -> None:
437        """A user has started syncing. Send a UserSync to the presence writer,
438        unless they had recently stopped syncing.
439        """
440        going_offline = self.users_going_offline.pop(user_id, None)
441        if not going_offline:
442            # Safe to skip because we haven't yet told the presence writer they
443            # were offline
444            self.send_user_sync(user_id, True, self.clock.time_msec())
445
446    def mark_as_going_offline(self, user_id: str) -> None:
447        """A user has stopped syncing. We wait before notifying the presence
448        writer as its likely they'll come back soon. This allows us to avoid
449        sending a stopped syncing immediately followed by a started syncing
450        notification to the presence writer
451        """
452        self.users_going_offline[user_id] = self.clock.time_msec()
453
454    def send_stop_syncing(self) -> None:
455        """Check if there are any users who have stopped syncing a while ago and
456        haven't come back yet. If there are poke the presence writer about them.
457        """
458        now = self.clock.time_msec()
459        for user_id, last_sync_ms in list(self.users_going_offline.items()):
460            if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
461                self.users_going_offline.pop(user_id, None)
462                self.send_user_sync(user_id, False, last_sync_ms)
463
464    async def user_syncing(
465        self, user_id: str, affect_presence: bool
466    ) -> ContextManager[None]:
467        """Record that a user is syncing.
468
469        Called by the sync and events servlets to record that a user has connected to
470        this worker and is waiting for some events.
471        """
472        if not affect_presence or not self._presence_enabled:
473            return _NullContextManager()
474
475        curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
476        self._user_to_num_current_syncs[user_id] = curr_sync + 1
477
478        # If we went from no in flight sync to some, notify replication
479        if self._user_to_num_current_syncs[user_id] == 1:
480            self.mark_as_coming_online(user_id)
481
482        def _end() -> None:
483            # We check that the user_id is in user_to_num_current_syncs because
484            # user_to_num_current_syncs may have been cleared if we are
485            # shutting down.
486            if user_id in self._user_to_num_current_syncs:
487                self._user_to_num_current_syncs[user_id] -= 1
488
489                # If we went from one in flight sync to non, notify replication
490                if self._user_to_num_current_syncs[user_id] == 0:
491                    self.mark_as_going_offline(user_id)
492
493        @contextlib.contextmanager
494        def _user_syncing() -> Generator[None, None, None]:
495            try:
496                yield
497            finally:
498                _end()
499
500        return _user_syncing()
501
502    async def notify_from_replication(
503        self, states: List[UserPresenceState], stream_id: int
504    ) -> None:
505        parties = await get_interested_parties(self.store, self.presence_router, states)
506        room_ids_to_states, users_to_states = parties
507
508        self.notifier.on_new_event(
509            "presence_key",
510            stream_id,
511            rooms=room_ids_to_states.keys(),
512            users=users_to_states.keys(),
513        )
514
515    async def process_replication_rows(
516        self, stream_name: str, instance_name: str, token: int, rows: list
517    ) -> None:
518        await super().process_replication_rows(stream_name, instance_name, token, rows)
519
520        if stream_name != PresenceStream.NAME:
521            return
522
523        states = [
524            UserPresenceState(
525                row.user_id,
526                row.state,
527                row.last_active_ts,
528                row.last_federation_update_ts,
529                row.last_user_sync_ts,
530                row.status_msg,
531                row.currently_active,
532            )
533            for row in rows
534        ]
535
536        # The list of states to notify sync streams and remote servers about.
537        # This is calculated by comparing the old and new states for each user
538        # using `should_notify(..)`.
539        #
540        # Note that this is necessary as the presence writer will periodically
541        # flush presence state changes that should not be notified about to the
542        # DB, and so will be sent over the replication stream.
543        state_to_notify = []
544
545        for new_state in states:
546            old_state = self.user_to_current_state.get(new_state.user_id)
547            self.user_to_current_state[new_state.user_id] = new_state
548
549            if not old_state or should_notify(old_state, new_state):
550                state_to_notify.append(new_state)
551
552        stream_id = token
553        await self.notify_from_replication(state_to_notify, stream_id)
554
555        # If this is a federation sender, notify about presence updates.
556        await self.maybe_send_presence_to_interested_destinations(state_to_notify)
557
558    def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
559        return [
560            user_id
561            for user_id, count in self._user_to_num_current_syncs.items()
562            if count > 0
563        ]
564
565    async def set_state(
566        self,
567        target_user: UserID,
568        state: JsonDict,
569        ignore_status_msg: bool = False,
570        force_notify: bool = False,
571    ) -> None:
572        """Set the presence state of the user.
573
574        Args:
575            target_user: The ID of the user to set the presence state of.
576            state: The presence state as a JSON dictionary.
577            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
578                If False, the user's current status will be updated.
579            force_notify: Whether to force notification of the update to clients.
580        """
581        presence = state["presence"]
582
583        valid_presence = (
584            PresenceState.ONLINE,
585            PresenceState.UNAVAILABLE,
586            PresenceState.OFFLINE,
587            PresenceState.BUSY,
588        )
589
590        if presence not in valid_presence or (
591            presence == PresenceState.BUSY and not self._busy_presence_enabled
592        ):
593            raise SynapseError(400, "Invalid presence state")
594
595        user_id = target_user.to_string()
596
597        # If presence is disabled, no-op
598        if not self.hs.config.server.use_presence:
599            return
600
601        # Proxy request to instance that writes presence
602        await self._set_state_client(
603            instance_name=self._presence_writer_instance,
604            user_id=user_id,
605            state=state,
606            ignore_status_msg=ignore_status_msg,
607            force_notify=force_notify,
608        )
609
610    async def bump_presence_active_time(self, user: UserID) -> None:
611        """We've seen the user do something that indicates they're interacting
612        with the app.
613        """
614        # If presence is disabled, no-op
615        if not self.hs.config.server.use_presence:
616            return
617
618        # Proxy request to instance that writes presence
619        user_id = user.to_string()
620        await self._bump_active_client(
621            instance_name=self._presence_writer_instance, user_id=user_id
622        )
623
624
625class PresenceHandler(BasePresenceHandler):
626    def __init__(self, hs: "HomeServer"):
627        super().__init__(hs)
628        self.hs = hs
629        self.server_name = hs.hostname
630        self.wheel_timer: WheelTimer[str] = WheelTimer()
631        self.notifier = hs.get_notifier()
632        self._presence_enabled = hs.config.server.use_presence
633
634        federation_registry = hs.get_federation_registry()
635
636        federation_registry.register_edu_handler("m.presence", self.incoming_presence)
637
638        LaterGauge(
639            "synapse_handlers_presence_user_to_current_state_size",
640            "",
641            [],
642            lambda: len(self.user_to_current_state),
643        )
644
645        now = self.clock.time_msec()
646        for state in self.user_to_current_state.values():
647            self.wheel_timer.insert(
648                now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
649            )
650            self.wheel_timer.insert(
651                now=now,
652                obj=state.user_id,
653                then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
654            )
655            if self.is_mine_id(state.user_id):
656                self.wheel_timer.insert(
657                    now=now,
658                    obj=state.user_id,
659                    then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
660                )
661            else:
662                self.wheel_timer.insert(
663                    now=now,
664                    obj=state.user_id,
665                    then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
666                )
667
668        # Set of users who have presence in the `user_to_current_state` that
669        # have not yet been persisted
670        self.unpersisted_users_changes: Set[str] = set()
671
672        hs.get_reactor().addSystemEventTrigger(
673            "before",
674            "shutdown",
675            run_as_background_process,
676            "presence.on_shutdown",
677            self._on_shutdown,
678        )
679
680        self._next_serial = 1
681
682        # Keeps track of the number of *ongoing* syncs on this process. While
683        # this is non zero a user will never go offline.
684        self.user_to_num_current_syncs: Dict[str, int] = {}
685
686        # Keeps track of the number of *ongoing* syncs on other processes.
687        # While any sync is ongoing on another process the user will never
688        # go offline.
689        # Each process has a unique identifier and an update frequency. If
690        # no update is received from that process within the update period then
691        # we assume that all the sync requests on that process have stopped.
692        # Stored as a dict from process_id to set of user_id, and a dict of
693        # process_id to millisecond timestamp last updated.
694        self.external_process_to_current_syncs: Dict[str, Set[str]] = {}
695        self.external_process_last_updated_ms: Dict[str, int] = {}
696
697        self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
698
699        if self._presence_enabled:
700            # Start a LoopingCall in 30s that fires every 5s.
701            # The initial delay is to allow disconnected clients a chance to
702            # reconnect before we treat them as offline.
703            def run_timeout_handler() -> Awaitable[None]:
704                return run_as_background_process(
705                    "handle_presence_timeouts", self._handle_timeouts
706                )
707
708            self.clock.call_later(
709                30, self.clock.looping_call, run_timeout_handler, 5000
710            )
711
712            def run_persister() -> Awaitable[None]:
713                return run_as_background_process(
714                    "persist_presence_changes", self._persist_unpersisted_changes
715                )
716
717            self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
718
719        LaterGauge(
720            "synapse_handlers_presence_wheel_timer_size",
721            "",
722            [],
723            lambda: len(self.wheel_timer),
724        )
725
726        # Used to handle sending of presence to newly joined users/servers
727        if self._presence_enabled:
728            self.notifier.add_replication_callback(self.notify_new_event)
729
730        # Presence is best effort and quickly heals itself, so lets just always
731        # stream from the current state when we restart.
732        self._event_pos = self.store.get_room_max_stream_ordering()
733        self._event_processing = False
734
735    async def _on_shutdown(self) -> None:
736        """Gets called when shutting down. This lets us persist any updates that
737        we haven't yet persisted, e.g. updates that only changes some internal
738        timers. This allows changes to persist across startup without having to
739        persist every single change.
740
741        If this does not run it simply means that some of the timers will fire
742        earlier than they should when synapse is restarted. This affect of this
743        is some spurious presence changes that will self-correct.
744        """
745        # If the DB pool has already terminated, don't try updating
746        if not self.store.db_pool.is_running():
747            return
748
749        logger.info(
750            "Performing _on_shutdown. Persisting %d unpersisted changes",
751            len(self.user_to_current_state),
752        )
753
754        if self.unpersisted_users_changes:
755
756            await self.store.update_presence(
757                [
758                    self.user_to_current_state[user_id]
759                    for user_id in self.unpersisted_users_changes
760                ]
761            )
762        logger.info("Finished _on_shutdown")
763
764    async def _persist_unpersisted_changes(self) -> None:
765        """We periodically persist the unpersisted changes, as otherwise they
766        may stack up and slow down shutdown times.
767        """
768        unpersisted = self.unpersisted_users_changes
769        self.unpersisted_users_changes = set()
770
771        if unpersisted:
772            logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
773            await self.store.update_presence(
774                [self.user_to_current_state[user_id] for user_id in unpersisted]
775            )
776
777    async def _update_states(
778        self, new_states: Iterable[UserPresenceState], force_notify: bool = False
779    ) -> None:
780        """Updates presence of users. Sets the appropriate timeouts. Pokes
781        the notifier and federation if and only if the changed presence state
782        should be sent to clients/servers.
783
784        Args:
785            new_states: The new user presence state updates to process.
786            force_notify: Whether to force notifying clients of this presence state update,
787                even if it doesn't change the state of a user's presence (e.g online -> online).
788                This is currently used to bump the max presence stream ID without changing any
789                user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
790        """
791        now = self.clock.time_msec()
792
793        with Measure(self.clock, "presence_update_states"):
794
795            # NOTE: We purposefully don't await between now and when we've
796            # calculated what we want to do with the new states, to avoid races.
797
798            to_notify = {}  # Changes we want to notify everyone about
799            to_federation_ping = {}  # These need sending keep-alives
800
801            # Only bother handling the last presence change for each user
802            new_states_dict = {}
803            for new_state in new_states:
804                new_states_dict[new_state.user_id] = new_state
805            new_states = new_states_dict.values()
806
807            for new_state in new_states:
808                user_id = new_state.user_id
809
810                # Its fine to not hit the database here, as the only thing not in
811                # the current state cache are OFFLINE states, where the only field
812                # of interest is last_active which is safe enough to assume is 0
813                # here.
814                prev_state = self.user_to_current_state.get(
815                    user_id, UserPresenceState.default(user_id)
816                )
817
818                new_state, should_notify, should_ping = handle_update(
819                    prev_state,
820                    new_state,
821                    is_mine=self.is_mine_id(user_id),
822                    wheel_timer=self.wheel_timer,
823                    now=now,
824                )
825
826                if force_notify:
827                    should_notify = True
828
829                self.user_to_current_state[user_id] = new_state
830
831                if should_notify:
832                    to_notify[user_id] = new_state
833                elif should_ping:
834                    to_federation_ping[user_id] = new_state
835
836            # TODO: We should probably ensure there are no races hereafter
837
838            presence_updates_counter.inc(len(new_states))
839
840            if to_notify:
841                notified_presence_counter.inc(len(to_notify))
842                await self._persist_and_notify(list(to_notify.values()))
843
844            self.unpersisted_users_changes |= {s.user_id for s in new_states}
845            self.unpersisted_users_changes -= set(to_notify.keys())
846
847            # Check if we need to resend any presence states to remote hosts. We
848            # only do this for states that haven't been updated in a while to
849            # ensure that the remote host doesn't time the presence state out.
850            #
851            # Note that since these are states that have *not* been updated,
852            # they won't get sent down the normal presence replication stream,
853            # and so we have to explicitly send them via the federation stream.
854            to_federation_ping = {
855                user_id: state
856                for user_id, state in to_federation_ping.items()
857                if user_id not in to_notify
858            }
859            if to_federation_ping:
860                federation_presence_out_counter.inc(len(to_federation_ping))
861
862                hosts_to_states = await get_interested_remotes(
863                    self.store,
864                    self.presence_router,
865                    list(to_federation_ping.values()),
866                )
867
868                for destination, states in hosts_to_states.items():
869                    self._federation_queue.send_presence_to_destinations(
870                        states, [destination]
871                    )
872
873    async def _handle_timeouts(self) -> None:
874        """Checks the presence of users that have timed out and updates as
875        appropriate.
876        """
877        logger.debug("Handling presence timeouts")
878        now = self.clock.time_msec()
879
880        # Fetch the list of users that *may* have timed out. Things may have
881        # changed since the timeout was set, so we won't necessarily have to
882        # take any action.
883        users_to_check = set(self.wheel_timer.fetch(now))
884
885        # Check whether the lists of syncing processes from an external
886        # process have expired.
887        expired_process_ids = [
888            process_id
889            for process_id, last_update in self.external_process_last_updated_ms.items()
890            if now - last_update > EXTERNAL_PROCESS_EXPIRY
891        ]
892        for process_id in expired_process_ids:
893            # For each expired process drop tracking info and check the users
894            # that were syncing on that process to see if they need to be timed
895            # out.
896            users_to_check.update(
897                self.external_process_to_current_syncs.pop(process_id, ())
898            )
899            self.external_process_last_updated_ms.pop(process_id)
900
901        states = [
902            self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
903            for user_id in users_to_check
904        ]
905
906        timers_fired_counter.inc(len(states))
907
908        syncing_user_ids = {
909            user_id
910            for user_id, count in self.user_to_num_current_syncs.items()
911            if count
912        }
913        for user_ids in self.external_process_to_current_syncs.values():
914            syncing_user_ids.update(user_ids)
915
916        changes = handle_timeouts(
917            states,
918            is_mine_fn=self.is_mine_id,
919            syncing_user_ids=syncing_user_ids,
920            now=now,
921        )
922
923        return await self._update_states(changes)
924
925    async def bump_presence_active_time(self, user: UserID) -> None:
926        """We've seen the user do something that indicates they're interacting
927        with the app.
928        """
929        # If presence is disabled, no-op
930        if not self.hs.config.server.use_presence:
931            return
932
933        user_id = user.to_string()
934
935        bump_active_time_counter.inc()
936
937        prev_state = await self.current_state_for_user(user_id)
938
939        new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
940        if prev_state.state == PresenceState.UNAVAILABLE:
941            new_fields["state"] = PresenceState.ONLINE
942
943        await self._update_states([prev_state.copy_and_replace(**new_fields)])
944
945    async def user_syncing(
946        self, user_id: str, affect_presence: bool = True
947    ) -> ContextManager[None]:
948        """Returns a context manager that should surround any stream requests
949        from the user.
950
951        This allows us to keep track of who is currently streaming and who isn't
952        without having to have timers outside of this module to avoid flickering
953        when users disconnect/reconnect.
954
955        Args:
956            user_id
957            affect_presence: If false this function will be a no-op.
958                Useful for streams that are not associated with an actual
959                client that is being used by a user.
960        """
961        # Override if it should affect the user's presence, if presence is
962        # disabled.
963        if not self.hs.config.server.use_presence:
964            affect_presence = False
965
966        if affect_presence:
967            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
968            self.user_to_num_current_syncs[user_id] = curr_sync + 1
969
970            prev_state = await self.current_state_for_user(user_id)
971            if prev_state.state == PresenceState.OFFLINE:
972                # If they're currently offline then bring them online, otherwise
973                # just update the last sync times.
974                await self._update_states(
975                    [
976                        prev_state.copy_and_replace(
977                            state=PresenceState.ONLINE,
978                            last_active_ts=self.clock.time_msec(),
979                            last_user_sync_ts=self.clock.time_msec(),
980                        )
981                    ]
982                )
983            else:
984                await self._update_states(
985                    [
986                        prev_state.copy_and_replace(
987                            last_user_sync_ts=self.clock.time_msec()
988                        )
989                    ]
990                )
991
992        async def _end() -> None:
993            try:
994                self.user_to_num_current_syncs[user_id] -= 1
995
996                prev_state = await self.current_state_for_user(user_id)
997                await self._update_states(
998                    [
999                        prev_state.copy_and_replace(
1000                            last_user_sync_ts=self.clock.time_msec()
1001                        )
1002                    ]
1003                )
1004            except Exception:
1005                logger.exception("Error updating presence after sync")
1006
1007        @contextmanager
1008        def _user_syncing() -> Generator[None, None, None]:
1009            try:
1010                yield
1011            finally:
1012                if affect_presence:
1013                    run_in_background(_end)
1014
1015        return _user_syncing()
1016
1017    def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
1018        # since we are the process handling presence, there is nothing to do here.
1019        return []
1020
1021    async def update_external_syncs_row(
1022        self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
1023    ) -> None:
1024        """Update the syncing users for an external process as a delta.
1025
1026        Args:
1027            process_id: An identifier for the process the users are
1028                syncing against. This allows synapse to process updates
1029                as user start and stop syncing against a given process.
1030            user_id: The user who has started or stopped syncing
1031            is_syncing: Whether or not the user is now syncing
1032            sync_time_msec: Time in ms when the user was last syncing
1033        """
1034        with (await self.external_sync_linearizer.queue(process_id)):
1035            prev_state = await self.current_state_for_user(user_id)
1036
1037            process_presence = self.external_process_to_current_syncs.setdefault(
1038                process_id, set()
1039            )
1040
1041            updates = []
1042            if is_syncing and user_id not in process_presence:
1043                if prev_state.state == PresenceState.OFFLINE:
1044                    updates.append(
1045                        prev_state.copy_and_replace(
1046                            state=PresenceState.ONLINE,
1047                            last_active_ts=sync_time_msec,
1048                            last_user_sync_ts=sync_time_msec,
1049                        )
1050                    )
1051                else:
1052                    updates.append(
1053                        prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
1054                    )
1055                process_presence.add(user_id)
1056            elif user_id in process_presence:
1057                updates.append(
1058                    prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
1059                )
1060
1061            if not is_syncing:
1062                process_presence.discard(user_id)
1063
1064            if updates:
1065                await self._update_states(updates)
1066
1067            self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
1068
1069    async def update_external_syncs_clear(self, process_id: str) -> None:
1070        """Marks all users that had been marked as syncing by a given process
1071        as offline.
1072
1073        Used when the process has stopped/disappeared.
1074        """
1075        with (await self.external_sync_linearizer.queue(process_id)):
1076            process_presence = self.external_process_to_current_syncs.pop(
1077                process_id, set()
1078            )
1079            prev_states = await self.current_state_for_users(process_presence)
1080            time_now_ms = self.clock.time_msec()
1081
1082            await self._update_states(
1083                [
1084                    prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
1085                    for prev_state in prev_states.values()
1086                ]
1087            )
1088            self.external_process_last_updated_ms.pop(process_id, None)
1089
1090    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
1091        """Get the current presence state for a user."""
1092        res = await self.current_state_for_users([user_id])
1093        return res[user_id]
1094
1095    async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
1096        """Persist states in the database, poke the notifier and send to
1097        interested remote servers
1098        """
1099        stream_id, max_token = await self.store.update_presence(states)
1100
1101        parties = await get_interested_parties(self.store, self.presence_router, states)
1102        room_ids_to_states, users_to_states = parties
1103
1104        self.notifier.on_new_event(
1105            "presence_key",
1106            stream_id,
1107            rooms=room_ids_to_states.keys(),
1108            users=[UserID.from_string(u) for u in users_to_states],
1109        )
1110
1111        # We only want to poke the local federation sender, if any, as other
1112        # workers will receive the presence updates via the presence replication
1113        # stream (which is updated by `store.update_presence`).
1114        await self.maybe_send_presence_to_interested_destinations(states)
1115
1116    async def incoming_presence(self, origin: str, content: JsonDict) -> None:
1117        """Called when we receive a `m.presence` EDU from a remote server."""
1118        if not self._presence_enabled:
1119            return
1120
1121        now = self.clock.time_msec()
1122        updates = []
1123        for push in content.get("push", []):
1124            # A "push" contains a list of presence that we are probably interested
1125            # in.
1126            user_id = push.get("user_id", None)
1127            if not user_id:
1128                logger.info(
1129                    "Got presence update from %r with no 'user_id': %r", origin, push
1130                )
1131                continue
1132
1133            if get_domain_from_id(user_id) != origin:
1134                logger.info(
1135                    "Got presence update from %r with bad 'user_id': %r",
1136                    origin,
1137                    user_id,
1138                )
1139                continue
1140
1141            presence_state = push.get("presence", None)
1142            if not presence_state:
1143                logger.info(
1144                    "Got presence update from %r with no 'presence_state': %r",
1145                    origin,
1146                    push,
1147                )
1148                continue
1149
1150            new_fields = {"state": presence_state, "last_federation_update_ts": now}
1151
1152            last_active_ago = push.get("last_active_ago", None)
1153            if last_active_ago is not None:
1154                new_fields["last_active_ts"] = now - last_active_ago
1155
1156            new_fields["status_msg"] = push.get("status_msg", None)
1157            new_fields["currently_active"] = push.get("currently_active", False)
1158
1159            prev_state = await self.current_state_for_user(user_id)
1160            updates.append(prev_state.copy_and_replace(**new_fields))
1161
1162        if updates:
1163            federation_presence_counter.inc(len(updates))
1164            await self._update_states(updates)
1165
1166    async def set_state(
1167        self,
1168        target_user: UserID,
1169        state: JsonDict,
1170        ignore_status_msg: bool = False,
1171        force_notify: bool = False,
1172    ) -> None:
1173        """Set the presence state of the user.
1174
1175        Args:
1176            target_user: The ID of the user to set the presence state of.
1177            state: The presence state as a JSON dictionary.
1178            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
1179                If False, the user's current status will be updated.
1180            force_notify: Whether to force notification of the update to clients.
1181        """
1182        status_msg = state.get("status_msg", None)
1183        presence = state["presence"]
1184
1185        valid_presence = (
1186            PresenceState.ONLINE,
1187            PresenceState.UNAVAILABLE,
1188            PresenceState.OFFLINE,
1189            PresenceState.BUSY,
1190        )
1191
1192        if presence not in valid_presence or (
1193            presence == PresenceState.BUSY and not self._busy_presence_enabled
1194        ):
1195            raise SynapseError(400, "Invalid presence state")
1196
1197        user_id = target_user.to_string()
1198
1199        prev_state = await self.current_state_for_user(user_id)
1200
1201        new_fields = {"state": presence}
1202
1203        if not ignore_status_msg:
1204            new_fields["status_msg"] = status_msg
1205
1206        if presence == PresenceState.ONLINE or (
1207            presence == PresenceState.BUSY and self._busy_presence_enabled
1208        ):
1209            new_fields["last_active_ts"] = self.clock.time_msec()
1210
1211        await self._update_states(
1212            [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
1213        )
1214
1215    async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
1216        """Returns whether a user can see another user's presence."""
1217        observer_room_ids = await self.store.get_rooms_for_user(
1218            observer_user.to_string()
1219        )
1220        observed_room_ids = await self.store.get_rooms_for_user(
1221            observed_user.to_string()
1222        )
1223
1224        if observer_room_ids & observed_room_ids:
1225            return True
1226
1227        return False
1228
1229    async def get_all_presence_updates(
1230        self, instance_name: str, last_id: int, current_id: int, limit: int
1231    ) -> Tuple[List[Tuple[int, list]], int, bool]:
1232        """
1233        Gets a list of presence update rows from between the given stream ids.
1234        Each row has:
1235        - stream_id(str)
1236        - user_id(str)
1237        - state(str)
1238        - last_active_ts(int)
1239        - last_federation_update_ts(int)
1240        - last_user_sync_ts(int)
1241        - status_msg(int)
1242        - currently_active(int)
1243
1244        Args:
1245            instance_name: The writer we want to fetch updates from. Unused
1246                here since there is only ever one writer.
1247            last_id: The token to fetch updates from. Exclusive.
1248            current_id: The token to fetch updates up to. Inclusive.
1249            limit: The requested limit for the number of rows to return. The
1250                function may return more or fewer rows.
1251
1252        Returns:
1253            A tuple consisting of: the updates, a token to use to fetch
1254            subsequent updates, and whether we returned fewer rows than exists
1255            between the requested tokens due to the limit.
1256
1257            The token returned can be used in a subsequent call to this
1258            function to get further updates.
1259
1260            The updates are a list of 2-tuples of stream ID and the row data
1261        """
1262
1263        # TODO(markjh): replicate the unpersisted changes.
1264        # This could use the in-memory stores for recent changes.
1265        rows = await self.store.get_all_presence_updates(
1266            instance_name, last_id, current_id, limit
1267        )
1268        return rows
1269
1270    def notify_new_event(self) -> None:
1271        """Called when new events have happened. Handles users and servers
1272        joining rooms and require being sent presence.
1273        """
1274
1275        if self._event_processing:
1276            return
1277
1278        async def _process_presence() -> None:
1279            assert not self._event_processing
1280
1281            self._event_processing = True
1282            try:
1283                await self._unsafe_process()
1284            finally:
1285                self._event_processing = False
1286
1287        run_as_background_process("presence.notify_new_event", _process_presence)
1288
1289    async def _unsafe_process(self) -> None:
1290        # Loop round handling deltas until we're up to date
1291        while True:
1292            with Measure(self.clock, "presence_delta"):
1293                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
1294                if self._event_pos == room_max_stream_ordering:
1295                    return
1296
1297                logger.debug(
1298                    "Processing presence stats %s->%s",
1299                    self._event_pos,
1300                    room_max_stream_ordering,
1301                )
1302                max_pos, deltas = await self.store.get_current_state_deltas(
1303                    self._event_pos, room_max_stream_ordering
1304                )
1305
1306                # We may get multiple deltas for different rooms, but we want to
1307                # handle them on a room by room basis, so we batch them up by
1308                # room.
1309                deltas_by_room: Dict[str, List[JsonDict]] = {}
1310                for delta in deltas:
1311                    deltas_by_room.setdefault(delta["room_id"], []).append(delta)
1312
1313                for room_id, deltas_for_room in deltas_by_room.items():
1314                    await self._handle_state_delta(room_id, deltas_for_room)
1315
1316                self._event_pos = max_pos
1317
1318                # Expose current event processing position to prometheus
1319                synapse.metrics.event_processing_positions.labels("presence").set(
1320                    max_pos
1321                )
1322
1323    async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
1324        """Process current state deltas for the room to find new joins that need
1325        to be handled.
1326        """
1327
1328        # Sets of newly joined users. Note that if the local server is
1329        # joining a remote room for the first time we'll see both the joining
1330        # user and all remote users as newly joined.
1331        newly_joined_users = set()
1332
1333        for delta in deltas:
1334            assert room_id == delta["room_id"]
1335
1336            typ = delta["type"]
1337            state_key = delta["state_key"]
1338            event_id = delta["event_id"]
1339            prev_event_id = delta["prev_event_id"]
1340
1341            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
1342
1343            # Drop any event that isn't a membership join
1344            if typ != EventTypes.Member:
1345                continue
1346
1347            if event_id is None:
1348                # state has been deleted, so this is not a join. We only care about
1349                # joins.
1350                continue
1351
1352            event = await self.store.get_event(event_id, allow_none=True)
1353            if not event or event.content.get("membership") != Membership.JOIN:
1354                # We only care about joins
1355                continue
1356
1357            if prev_event_id:
1358                prev_event = await self.store.get_event(prev_event_id, allow_none=True)
1359                if (
1360                    prev_event
1361                    and prev_event.content.get("membership") == Membership.JOIN
1362                ):
1363                    # Ignore changes to join events.
1364                    continue
1365
1366            newly_joined_users.add(state_key)
1367
1368        if not newly_joined_users:
1369            # If nobody has joined then there's nothing to do.
1370            return
1371
1372        # We want to send:
1373        #   1. presence states of all local users in the room to newly joined
1374        #      remote servers
1375        #   2. presence states of newly joined users to all remote servers in
1376        #      the room.
1377        #
1378        # TODO: Only send presence states to remote hosts that don't already
1379        # have them (because they already share rooms).
1380
1381        # Get all the users who were already in the room, by fetching the
1382        # current users in the room and removing the newly joined users.
1383        users = await self.store.get_users_in_room(room_id)
1384        prev_users = set(users) - newly_joined_users
1385
1386        # Construct sets for all the local users and remote hosts that were
1387        # already in the room
1388        prev_local_users = []
1389        prev_remote_hosts = set()
1390        for user_id in prev_users:
1391            if self.is_mine_id(user_id):
1392                prev_local_users.append(user_id)
1393            else:
1394                prev_remote_hosts.add(get_domain_from_id(user_id))
1395
1396        # Similarly, construct sets for all the local users and remote hosts
1397        # that were *not* already in the room. Care needs to be taken with the
1398        # calculating the remote hosts, as a host may have already been in the
1399        # room even if there is a newly joined user from that host.
1400        newly_joined_local_users = []
1401        newly_joined_remote_hosts = set()
1402        for user_id in newly_joined_users:
1403            if self.is_mine_id(user_id):
1404                newly_joined_local_users.append(user_id)
1405            else:
1406                host = get_domain_from_id(user_id)
1407                if host not in prev_remote_hosts:
1408                    newly_joined_remote_hosts.add(host)
1409
1410        # Send presence states of all local users in the room to newly joined
1411        # remote servers. (We actually only send states for local users already
1412        # in the room, as we'll send states for newly joined local users below.)
1413        if prev_local_users and newly_joined_remote_hosts:
1414            local_states = await self.current_state_for_users(prev_local_users)
1415
1416            # Filter out old presence, i.e. offline presence states where
1417            # the user hasn't been active for a week. We can change this
1418            # depending on what we want the UX to be, but at the least we
1419            # should filter out offline presence where the state is just the
1420            # default state.
1421            now = self.clock.time_msec()
1422            states = [
1423                state
1424                for state in local_states.values()
1425                if state.state != PresenceState.OFFLINE
1426                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
1427                or state.status_msg is not None
1428            ]
1429
1430            self._federation_queue.send_presence_to_destinations(
1431                destinations=newly_joined_remote_hosts,
1432                states=states,
1433            )
1434
1435        # Send presence states of newly joined users to all remote servers in
1436        # the room
1437        if newly_joined_local_users and (
1438            prev_remote_hosts or newly_joined_remote_hosts
1439        ):
1440            local_states = await self.current_state_for_users(newly_joined_local_users)
1441            self._federation_queue.send_presence_to_destinations(
1442                destinations=prev_remote_hosts | newly_joined_remote_hosts,
1443                states=list(local_states.values()),
1444            )
1445
1446
1447def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
1448    """Decides if a presence state change should be sent to interested parties."""
1449    if old_state == new_state:
1450        return False
1451
1452    if old_state.status_msg != new_state.status_msg:
1453        notify_reason_counter.labels("status_msg_change").inc()
1454        return True
1455
1456    if old_state.state != new_state.state:
1457        notify_reason_counter.labels("state_change").inc()
1458        state_transition_counter.labels(old_state.state, new_state.state).inc()
1459        return True
1460
1461    if old_state.state == PresenceState.ONLINE:
1462        if new_state.currently_active != old_state.currently_active:
1463            notify_reason_counter.labels("current_active_change").inc()
1464            return True
1465
1466        if (
1467            new_state.last_active_ts - old_state.last_active_ts
1468            > LAST_ACTIVE_GRANULARITY
1469        ):
1470            # Only notify about last active bumps if we're not currently active
1471            if not new_state.currently_active:
1472                notify_reason_counter.labels("last_active_change_online").inc()
1473                return True
1474
1475    elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
1476        # Always notify for a transition where last active gets bumped.
1477        notify_reason_counter.labels("last_active_change_not_online").inc()
1478        return True
1479
1480    return False
1481
1482
1483def format_user_presence_state(
1484    state: UserPresenceState, now: int, include_user_id: bool = True
1485) -> JsonDict:
1486    """Convert UserPresenceState to a JSON format that can be sent down to clients
1487    and to other servers.
1488
1489    Args:
1490        state: The user presence state to format.
1491        now: The current timestamp since the epoch in ms.
1492        include_user_id: Whether to include `user_id` in the returned dictionary.
1493            As this function can be used both to format presence updates for client /sync
1494            responses and for federation /send requests, only the latter needs the include
1495            the `user_id` field.
1496
1497    Returns:
1498        A JSON dictionary with the following keys:
1499            * presence: The presence state as a str.
1500            * user_id: Optional. Included if `include_user_id` is truthy. The canonical
1501                Matrix ID of the user.
1502            * last_active_ago: Optional. Included if `last_active_ts` is set on `state`.
1503                The timestamp that the user was last active.
1504            * status_msg: Optional. Included if `status_msg` is set on `state`. The user's
1505                status.
1506            * currently_active: Optional. Included only if `state.state` is "online".
1507
1508        Example:
1509
1510        {
1511            "presence": "online",
1512            "user_id": "@alice:example.com",
1513            "last_active_ago": 16783813918,
1514            "status_msg": "Hello world!",
1515            "currently_active": True
1516        }
1517    """
1518    content: JsonDict = {"presence": state.state}
1519    if include_user_id:
1520        content["user_id"] = state.user_id
1521    if state.last_active_ts:
1522        content["last_active_ago"] = now - state.last_active_ts
1523    if state.status_msg:
1524        content["status_msg"] = state.status_msg
1525    if state.state == PresenceState.ONLINE:
1526        content["currently_active"] = state.currently_active
1527
1528    return content
1529
1530
1531class PresenceEventSource(EventSource[int, UserPresenceState]):
1532    def __init__(self, hs: "HomeServer"):
1533        # We can't call get_presence_handler here because there's a cycle:
1534        #
1535        #   Presence -> Notifier -> PresenceEventSource -> Presence
1536        #
1537        # Same with get_presence_router:
1538        #
1539        #   AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
1540        self.get_presence_handler = hs.get_presence_handler
1541        self.get_presence_router = hs.get_presence_router
1542        self.clock = hs.get_clock()
1543        self.store = hs.get_datastore()
1544
1545    @log_function
1546    async def get_new_events(
1547        self,
1548        user: UserID,
1549        from_key: Optional[int],
1550        limit: Optional[int] = None,
1551        room_ids: Optional[Collection[str]] = None,
1552        is_guest: bool = False,
1553        explicit_room_id: Optional[str] = None,
1554        include_offline: bool = True,
1555        service: Optional[ApplicationService] = None,
1556    ) -> Tuple[List[UserPresenceState], int]:
1557        # The process for getting presence events are:
1558        #  1. Get the rooms the user is in.
1559        #  2. Get the list of user in the rooms.
1560        #  3. Get the list of users that are in the user's presence list.
1561        #  4. If there is a from_key set, cross reference the list of users
1562        #     with the `presence_stream_cache` to see which ones we actually
1563        #     need to check.
1564        #  5. Load current state for the users.
1565        #
1566        # We don't try and limit the presence updates by the current token, as
1567        # sending down the rare duplicate is not a concern.
1568
1569        user_id = user.to_string()
1570        stream_change_cache = self.store.presence_stream_cache
1571
1572        with Measure(self.clock, "presence.get_new_events"):
1573            if from_key is not None:
1574                from_key = int(from_key)
1575
1576                # Check if this user should receive all current, online user presence. We only
1577                # bother to do this if from_key is set, as otherwise the user will receive all
1578                # user presence anyways.
1579                if await self.store.should_user_receive_full_presence_with_token(
1580                    user_id, from_key
1581                ):
1582                    # This user has been specified by a module to receive all current, online
1583                    # user presence. Removing from_key and setting include_offline to false
1584                    # will do effectively this.
1585                    from_key = None
1586                    include_offline = False
1587
1588            max_token = self.store.get_current_presence_token()
1589            if from_key == max_token:
1590                # This is necessary as due to the way stream ID generators work
1591                # we may get updates that have a stream ID greater than the max
1592                # token (e.g. max_token is N but stream generator may return
1593                # results for N+2, due to N+1 not having finished being
1594                # persisted yet).
1595                #
1596                # This is usually fine, as it just means that we may send down
1597                # some presence updates multiple times. However, we need to be
1598                # careful that the sync stream either actually does make some
1599                # progress or doesn't return, otherwise clients will end up
1600                # tight looping calling /sync due to it immediately returning
1601                # the same token repeatedly.
1602                #
1603                # Hence this guard where we just return nothing so that the sync
1604                # doesn't return. C.f. #5503.
1605                return [], max_token
1606
1607            # Figure out which other users this user should receive updates for
1608            users_interested_in = await self._get_interested_in(user, explicit_room_id)
1609
1610            # We have a set of users that we're interested in the presence of. We want to
1611            # cross-reference that with the users that have actually changed their presence.
1612
1613            # Check whether this user should see all user updates
1614
1615            if users_interested_in == PresenceRouter.ALL_USERS:
1616                # Provide presence state for all users
1617                presence_updates = await self._filter_all_presence_updates_for_user(
1618                    user_id, include_offline, from_key
1619                )
1620
1621                return presence_updates, max_token
1622
1623            # Make mypy happy. users_interested_in should now be a set
1624            assert not isinstance(users_interested_in, str)
1625
1626            # The set of users that we're interested in and that have had a presence update.
1627            # We'll actually pull the presence updates for these users at the end.
1628            interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set()
1629
1630            if from_key:
1631                # First get all users that have had a presence update
1632                updated_users = stream_change_cache.get_all_entities_changed(from_key)
1633
1634                # Cross-reference users we're interested in with those that have had updates.
1635                # Use a slightly-optimised method for processing smaller sets of updates.
1636                if updated_users is not None and len(updated_users) < 500:
1637                    # For small deltas, it's quicker to get all changes and then
1638                    # cross-reference with the users we're interested in
1639                    get_updates_counter.labels("stream").inc()
1640                    for other_user_id in updated_users:
1641                        if other_user_id in users_interested_in:
1642                            # mypy thinks this variable could be a FrozenSet as it's possibly set
1643                            # to one in the `get_entities_changed` call below, and `add()` is not
1644                            # method on a FrozenSet. That doesn't affect us here though, as
1645                            # `interested_and_updated_users` is clearly a set() above.
1646                            interested_and_updated_users.add(other_user_id)  # type: ignore
1647                else:
1648                    # Too many possible updates. Find all users we can see and check
1649                    # if any of them have changed.
1650                    get_updates_counter.labels("full").inc()
1651
1652                    interested_and_updated_users = (
1653                        stream_change_cache.get_entities_changed(
1654                            users_interested_in, from_key
1655                        )
1656                    )
1657            else:
1658                # No from_key has been specified. Return the presence for all users
1659                # this user is interested in
1660                interested_and_updated_users = users_interested_in
1661
1662            # Retrieve the current presence state for each user
1663            users_to_state = await self.get_presence_handler().current_state_for_users(
1664                interested_and_updated_users
1665            )
1666            presence_updates = list(users_to_state.values())
1667
1668        if not include_offline:
1669            # Filter out offline presence states
1670            presence_updates = self._filter_offline_presence_state(presence_updates)
1671
1672        return presence_updates, max_token
1673
1674    async def _filter_all_presence_updates_for_user(
1675        self,
1676        user_id: str,
1677        include_offline: bool,
1678        from_key: Optional[int] = None,
1679    ) -> List[UserPresenceState]:
1680        """
1681        Computes the presence updates a user should receive.
1682
1683        First pulls presence updates from the database. Then consults PresenceRouter
1684        for whether any updates should be excluded by user ID.
1685
1686        Args:
1687            user_id: The User ID of the user to compute presence updates for.
1688            include_offline: Whether to include offline presence states from the results.
1689            from_key: The minimum stream ID of updates to pull from the database
1690                before filtering.
1691
1692        Returns:
1693            A list of presence states for the given user to receive.
1694        """
1695        if from_key:
1696            # Only return updates since the last sync
1697            updated_users = self.store.presence_stream_cache.get_all_entities_changed(
1698                from_key
1699            )
1700            if not updated_users:
1701                updated_users = []
1702
1703            # Get the actual presence update for each change
1704            users_to_state = await self.get_presence_handler().current_state_for_users(
1705                updated_users
1706            )
1707            presence_updates = list(users_to_state.values())
1708
1709            if not include_offline:
1710                # Filter out offline states
1711                presence_updates = self._filter_offline_presence_state(presence_updates)
1712        else:
1713            users_to_state = await self.store.get_presence_for_all_users(
1714                include_offline=include_offline
1715            )
1716
1717            presence_updates = list(users_to_state.values())
1718
1719        # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
1720        # module for information on a number of users when we then only take the info
1721        # for a single user
1722
1723        # Filter through the presence router
1724        users_to_state_set = await self.get_presence_router().get_users_for_states(
1725            presence_updates
1726        )
1727
1728        # We only want the mapping for the syncing user
1729        presence_updates = list(users_to_state_set[user_id])
1730
1731        # Return presence information for all users
1732        return presence_updates
1733
1734    def _filter_offline_presence_state(
1735        self, presence_updates: Iterable[UserPresenceState]
1736    ) -> List[UserPresenceState]:
1737        """Given an iterable containing user presence updates, return a list with any offline
1738        presence states removed.
1739
1740        Args:
1741            presence_updates: Presence states to filter
1742
1743        Returns:
1744            A new list with any offline presence states removed.
1745        """
1746        return [
1747            update
1748            for update in presence_updates
1749            if update.state != PresenceState.OFFLINE
1750        ]
1751
1752    def get_current_key(self) -> int:
1753        return self.store.get_current_presence_token()
1754
1755    @cached(num_args=2, cache_context=True)
1756    async def _get_interested_in(
1757        self,
1758        user: UserID,
1759        explicit_room_id: Optional[str] = None,
1760        cache_context: Optional[_CacheContext] = None,
1761    ) -> Union[Set[str], str]:
1762        """Returns the set of users that the given user should see presence
1763        updates for.
1764
1765        Args:
1766            user: The user to retrieve presence updates for.
1767            explicit_room_id: The users that are in the room will be returned.
1768
1769        Returns:
1770            A set of user IDs to return presence updates for, or "ALL" to return all
1771            known updates.
1772        """
1773        user_id = user.to_string()
1774        users_interested_in = set()
1775        users_interested_in.add(user_id)  # So that we receive our own presence
1776
1777        # cache_context isn't likely to ever be None due to the @cached decorator,
1778        # but we can't have a non-optional argument after the optional argument
1779        # explicit_room_id either. Assert cache_context is not None so we can use it
1780        # without mypy complaining.
1781        assert cache_context
1782
1783        # Check with the presence router whether we should poll additional users for
1784        # their presence information
1785        additional_users = await self.get_presence_router().get_interested_users(
1786            user.to_string()
1787        )
1788        if additional_users == PresenceRouter.ALL_USERS:
1789            # If the module requested that this user see the presence updates of *all*
1790            # users, then simply return that instead of calculating what rooms this
1791            # user shares
1792            return PresenceRouter.ALL_USERS
1793
1794        # Add the additional users from the router
1795        users_interested_in.update(additional_users)
1796
1797        # Find the users who share a room with this user
1798        users_who_share_room = await self.store.get_users_who_share_room_with_user(
1799            user_id, on_invalidate=cache_context.invalidate
1800        )
1801        users_interested_in.update(users_who_share_room)
1802
1803        if explicit_room_id:
1804            user_ids = await self.store.get_users_in_room(
1805                explicit_room_id, on_invalidate=cache_context.invalidate
1806            )
1807            users_interested_in.update(user_ids)
1808
1809        return users_interested_in
1810
1811
1812def handle_timeouts(
1813    user_states: List[UserPresenceState],
1814    is_mine_fn: Callable[[str], bool],
1815    syncing_user_ids: Set[str],
1816    now: int,
1817) -> List[UserPresenceState]:
1818    """Checks the presence of users that have timed out and updates as
1819    appropriate.
1820
1821    Args:
1822        user_states: List of UserPresenceState's to check.
1823        is_mine_fn: Function that returns if a user_id is ours
1824        syncing_user_ids: Set of user_ids with active syncs.
1825        now: Current time in ms.
1826
1827    Returns:
1828        List of UserPresenceState updates
1829    """
1830    changes = {}  # Actual changes we need to notify people about
1831
1832    for state in user_states:
1833        is_mine = is_mine_fn(state.user_id)
1834
1835        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
1836        if new_state:
1837            changes[state.user_id] = new_state
1838
1839    return list(changes.values())
1840
1841
1842def handle_timeout(
1843    state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
1844) -> Optional[UserPresenceState]:
1845    """Checks the presence of the user to see if any of the timers have elapsed
1846
1847    Args:
1848        state
1849        is_mine: Whether the user is ours
1850        syncing_user_ids: Set of user_ids with active syncs.
1851        now: Current time in ms.
1852
1853    Returns:
1854        A UserPresenceState update or None if no update.
1855    """
1856    if state.state == PresenceState.OFFLINE:
1857        # No timeouts are associated with offline states.
1858        return None
1859
1860    changed = False
1861    user_id = state.user_id
1862
1863    if is_mine:
1864        if state.state == PresenceState.ONLINE:
1865            if now - state.last_active_ts > IDLE_TIMER:
1866                # Currently online, but last activity ages ago so auto
1867                # idle
1868                state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
1869                changed = True
1870            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
1871                # So that we send down a notification that we've
1872                # stopped updating.
1873                changed = True
1874
1875        if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
1876            # Need to send ping to other servers to ensure they don't
1877            # timeout and set us to offline
1878            changed = True
1879
1880        # If there are have been no sync for a while (and none ongoing),
1881        # set presence to offline
1882        if user_id not in syncing_user_ids:
1883            # If the user has done something recently but hasn't synced,
1884            # don't set them as offline.
1885            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
1886            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
1887                state = state.copy_and_replace(state=PresenceState.OFFLINE)
1888                changed = True
1889    else:
1890        # We expect to be poked occasionally by the other side.
1891        # This is to protect against forgetful/buggy servers, so that
1892        # no one gets stuck online forever.
1893        if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
1894            # The other side seems to have disappeared.
1895            state = state.copy_and_replace(state=PresenceState.OFFLINE)
1896            changed = True
1897
1898    return state if changed else None
1899
1900
1901def handle_update(
1902    prev_state: UserPresenceState,
1903    new_state: UserPresenceState,
1904    is_mine: bool,
1905    wheel_timer: WheelTimer,
1906    now: int,
1907) -> Tuple[UserPresenceState, bool, bool]:
1908    """Given a presence update:
1909        1. Add any appropriate timers.
1910        2. Check if we should notify anyone.
1911
1912    Args:
1913        prev_state
1914        new_state
1915        is_mine: Whether the user is ours
1916        wheel_timer
1917        now: Time now in ms
1918
1919    Returns:
1920        3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
1921            - new_state: is the state to actually persist
1922            - persist_and_notify: whether to persist and notify people
1923            - federation_ping: whether we should send a ping over federation
1924    """
1925    user_id = new_state.user_id
1926
1927    persist_and_notify = False
1928    federation_ping = False
1929
1930    # If the users are ours then we want to set up a bunch of timers
1931    # to time things out.
1932    if is_mine:
1933        if new_state.state == PresenceState.ONLINE:
1934            # Idle timer
1935            wheel_timer.insert(
1936                now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
1937            )
1938
1939            active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
1940            new_state = new_state.copy_and_replace(currently_active=active)
1941
1942            if active:
1943                wheel_timer.insert(
1944                    now=now,
1945                    obj=user_id,
1946                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
1947                )
1948
1949        if new_state.state != PresenceState.OFFLINE:
1950            # User has stopped syncing
1951            wheel_timer.insert(
1952                now=now,
1953                obj=user_id,
1954                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
1955            )
1956
1957            last_federate = new_state.last_federation_update_ts
1958            if now - last_federate > FEDERATION_PING_INTERVAL:
1959                # Been a while since we've poked remote servers
1960                new_state = new_state.copy_and_replace(last_federation_update_ts=now)
1961                federation_ping = True
1962
1963    else:
1964        wheel_timer.insert(
1965            now=now,
1966            obj=user_id,
1967            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
1968        )
1969
1970    # Check whether the change was something worth notifying about
1971    if should_notify(prev_state, new_state):
1972        new_state = new_state.copy_and_replace(last_federation_update_ts=now)
1973        persist_and_notify = True
1974
1975    return new_state, persist_and_notify, federation_ping
1976
1977
1978async def get_interested_parties(
1979    store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
1980) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
1981    """Given a list of states return which entities (rooms, users)
1982    are interested in the given states.
1983
1984    Args:
1985        store: The homeserver's data store.
1986        presence_router: A module for augmenting the destinations for presence updates.
1987        states: A list of incoming user presence updates.
1988
1989    Returns:
1990        A 2-tuple of `(room_ids_to_states, users_to_states)`,
1991        with each item being a dict of `entity_name` -> `[UserPresenceState]`
1992    """
1993    room_ids_to_states: Dict[str, List[UserPresenceState]] = {}
1994    users_to_states: Dict[str, List[UserPresenceState]] = {}
1995    for state in states:
1996        room_ids = await store.get_rooms_for_user(state.user_id)
1997        for room_id in room_ids:
1998            room_ids_to_states.setdefault(room_id, []).append(state)
1999
2000        # Always notify self
2001        users_to_states.setdefault(state.user_id, []).append(state)
2002
2003    # Ask a presence routing module for any additional parties if one
2004    # is loaded.
2005    router_users_to_states = await presence_router.get_users_for_states(states)
2006
2007    # Update the dictionaries with additional destinations and state to send
2008    for user_id, user_states in router_users_to_states.items():
2009        users_to_states.setdefault(user_id, []).extend(user_states)
2010
2011    return room_ids_to_states, users_to_states
2012
2013
2014async def get_interested_remotes(
2015    store: DataStore,
2016    presence_router: PresenceRouter,
2017    states: List[UserPresenceState],
2018) -> Dict[str, Set[UserPresenceState]]:
2019    """Given a list of presence states figure out which remote servers
2020    should be sent which.
2021
2022    All the presence states should be for local users only.
2023
2024    Args:
2025        store: The homeserver's data store.
2026        presence_router: A module for augmenting the destinations for presence updates.
2027        states: A list of incoming user presence updates.
2028
2029    Returns:
2030        A map from destinations to presence states to send to that destination.
2031    """
2032    hosts_and_states: Dict[str, Set[UserPresenceState]] = {}
2033
2034    # First we look up the rooms each user is in (as well as any explicit
2035    # subscriptions), then for each distinct room we look up the remote
2036    # hosts in those rooms.
2037    room_ids_to_states, users_to_states = await get_interested_parties(
2038        store, presence_router, states
2039    )
2040
2041    for room_id, states in room_ids_to_states.items():
2042        user_ids = await store.get_users_in_room(room_id)
2043        hosts = {get_domain_from_id(user_id) for user_id in user_ids}
2044        for host in hosts:
2045            hosts_and_states.setdefault(host, set()).update(states)
2046
2047    for user_id, states in users_to_states.items():
2048        host = get_domain_from_id(user_id)
2049        hosts_and_states.setdefault(host, set()).update(states)
2050
2051    return hosts_and_states
2052
2053
2054class PresenceFederationQueue:
2055    """Handles sending ad hoc presence updates over federation, which are *not*
2056    due to state updates (that get handled via the presence stream), e.g.
2057    federation pings and sending existing present states to newly joined hosts.
2058
2059    Only the last N minutes will be queued, so if a federation sender instance
2060    is down for longer then some updates will be dropped. This is OK as presence
2061    is ephemeral, and so it will self correct eventually.
2062
2063    On workers the class tracks the last received position of the stream from
2064    replication, and handles querying for missed updates over HTTP replication,
2065    c.f. `get_current_token` and `get_replication_rows`.
2066    """
2067
2068    # How long to keep entries in the queue for. Workers that are down for
2069    # longer than this duration will miss out on older updates.
2070    _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
2071
2072    # How often to check if we can expire entries from the queue.
2073    _CLEAR_ITEMS_EVERY_MS = 60 * 1000
2074
2075    def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
2076        self._clock = hs.get_clock()
2077        self._notifier = hs.get_notifier()
2078        self._instance_name = hs.get_instance_name()
2079        self._presence_handler = presence_handler
2080        self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
2081
2082        # Should we keep a queue of recent presence updates? We only bother if
2083        # another process may be handling federation sending.
2084        self._queue_presence_updates = True
2085
2086        # Whether this instance is a presence writer.
2087        self._presence_writer = self._instance_name in hs.config.worker.writers.presence
2088
2089        # The FederationSender instance, if this process sends federation traffic directly.
2090        self._federation = None
2091
2092        if hs.should_send_federation():
2093            self._federation = hs.get_federation_sender()
2094
2095            # We don't bother queuing up presence states if only this instance
2096            # is sending federation.
2097            if hs.config.worker.federation_shard_config.instances == [
2098                self._instance_name
2099            ]:
2100                self._queue_presence_updates = False
2101
2102        # The queue of recently queued updates as tuples of: `(timestamp,
2103        # stream_id, destinations, user_ids)`. We don't store the full states
2104        # for efficiency, and remote workers will already have the full states
2105        # cached.
2106        self._queue: List[Tuple[int, int, Collection[str], Set[str]]] = []
2107
2108        self._next_id = 1
2109
2110        # Map from instance name to current token
2111        self._current_tokens: Dict[str, int] = {}
2112
2113        if self._queue_presence_updates:
2114            self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
2115
2116    def _clear_queue(self) -> None:
2117        """Clear out older entries from the queue."""
2118        clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
2119
2120        # The queue is sorted by timestamp, so we can bisect to find the right
2121        # place to purge before. Note that we are searching using a 1-tuple with
2122        # the time, which does The Right Thing since the queue is a tuple where
2123        # the first item is a timestamp.
2124        index = bisect(self._queue, (clear_before,))
2125        self._queue = self._queue[index:]
2126
2127    def send_presence_to_destinations(
2128        self, states: Collection[UserPresenceState], destinations: Collection[str]
2129    ) -> None:
2130        """Send the presence states to the given destinations.
2131
2132        Will forward to the local federation sender (if there is one) and queue
2133        to send over replication (if there are other federation sender instances.).
2134
2135        Must only be called on the presence writer process.
2136        """
2137
2138        # This should only be called on a presence writer.
2139        assert self._presence_writer
2140
2141        if self._federation:
2142            self._federation.send_presence_to_destinations(
2143                states=states,
2144                destinations=destinations,
2145            )
2146
2147        if not self._queue_presence_updates:
2148            return
2149
2150        now = self._clock.time_msec()
2151
2152        stream_id = self._next_id
2153        self._next_id += 1
2154
2155        self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
2156
2157        self._notifier.notify_replication()
2158
2159    def get_current_token(self, instance_name: str) -> int:
2160        """Get the current position of the stream.
2161
2162        On workers this returns the last stream ID received from replication.
2163        """
2164        if instance_name == self._instance_name:
2165            return self._next_id - 1
2166        else:
2167            return self._current_tokens.get(instance_name, 0)
2168
2169    async def get_replication_rows(
2170        self,
2171        instance_name: str,
2172        from_token: int,
2173        upto_token: int,
2174        target_row_count: int,
2175    ) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
2176        """Get all the updates between the two tokens.
2177
2178        We return rows in the form of `(destination, user_id)` to keep the size
2179        of each row bounded (rather than returning the sets in a row).
2180
2181        On workers this will query the presence writer process via HTTP replication.
2182        """
2183        if instance_name != self._instance_name:
2184            # If not local we query over http replication from the presence
2185            # writer
2186            result = await self._repl_client(
2187                instance_name=instance_name,
2188                stream_name=PresenceFederationStream.NAME,
2189                from_token=from_token,
2190                upto_token=upto_token,
2191            )
2192            return result["updates"], result["upto_token"], result["limited"]
2193
2194        # If the from_token is the current token then there's nothing to return
2195        # and we can trivially no-op.
2196        if from_token == self._next_id - 1:
2197            return [], upto_token, False
2198
2199        # We can find the correct position in the queue by noting that there is
2200        # exactly one entry per stream ID, and that the last entry has an ID of
2201        # `self._next_id - 1`, so we can count backwards from the end.
2202        #
2203        # Since we are returning all states in the range `from_token < stream_id
2204        # <= upto_token` we look for the index with a `stream_id` of `from_token
2205        # + 1`.
2206        #
2207        # Since the start of the queue is periodically truncated we need to
2208        # handle the case where `from_token` stream ID has already been dropped.
2209        start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
2210
2211        to_send: List[Tuple[int, Tuple[str, str]]] = []
2212        limited = False
2213        new_id = upto_token
2214        for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
2215            if stream_id <= from_token:
2216                # Paranoia check that we are actually only sending states that
2217                # are have stream_id strictly greater than from_token. We should
2218                # never hit this.
2219                logger.warning(
2220                    "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
2221                    stream_id,
2222                    from_token,
2223                    self._next_id,
2224                    len(self._queue),
2225                )
2226                continue
2227
2228            if stream_id > upto_token:
2229                break
2230
2231            new_id = stream_id
2232
2233            to_send.extend(
2234                (stream_id, (destination, user_id))
2235                for destination in destinations
2236                for user_id in user_ids
2237            )
2238
2239            if len(to_send) > target_row_count:
2240                limited = True
2241                break
2242
2243        return to_send, new_id, limited
2244
2245    async def process_replication_rows(
2246        self, stream_name: str, instance_name: str, token: int, rows: list
2247    ) -> None:
2248        if stream_name != PresenceFederationStream.NAME:
2249            return
2250
2251        # We keep track of the current tokens (so that we can catch up with anything we missed after a disconnect)
2252        self._current_tokens[instance_name] = token
2253
2254        # If we're a federation sender we pull out the presence states to send
2255        # and forward them on.
2256        if not self._federation:
2257            return
2258
2259        hosts_to_users: Dict[str, Set[str]] = {}
2260        for row in rows:
2261            hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
2262
2263        for host, user_ids in hosts_to_users.items():
2264            states = await self._presence_handler.current_state_for_users(user_ids)
2265            self._federation.send_presence_to_destinations(
2266                states=states.values(),
2267                destinations=[host],
2268            )
2269