1# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
2# Copyright 2020 Sorunome
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import abc
16import logging
17import random
18from http import HTTPStatus
19from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
20
21from synapse import types
22from synapse.api.constants import (
23    AccountDataTypes,
24    EventContentFields,
25    EventTypes,
26    GuestAccess,
27    Membership,
28)
29from synapse.api.errors import (
30    AuthError,
31    Codes,
32    LimitExceededError,
33    ShadowBanError,
34    SynapseError,
35)
36from synapse.api.ratelimiting import Ratelimiter
37from synapse.event_auth import get_named_level, get_power_level_event
38from synapse.events import EventBase
39from synapse.events.snapshot import EventContext
40from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
41from synapse.types import (
42    JsonDict,
43    Requester,
44    RoomAlias,
45    RoomID,
46    StateMap,
47    UserID,
48    create_requester,
49    get_domain_from_id,
50)
51from synapse.util.async_helpers import Linearizer
52from synapse.util.distributor import user_left_room
53
54if TYPE_CHECKING:
55    from synapse.server import HomeServer
56
57
58logger = logging.getLogger(__name__)
59
60
61class RoomMemberHandler(metaclass=abc.ABCMeta):
62    # TODO(paul): This handler currently contains a messy conflation of
63    #   low-level API that works on UserID objects and so on, and REST-level
64    #   API that takes ID strings and returns pagination chunks. These concerns
65    #   ought to be separated out a lot better.
66
67    def __init__(self, hs: "HomeServer"):
68        self.hs = hs
69        self.store = hs.get_datastore()
70        self.auth = hs.get_auth()
71        self.state_handler = hs.get_state_handler()
72        self.config = hs.config
73        self._server_name = hs.hostname
74
75        self.federation_handler = hs.get_federation_handler()
76        self.directory_handler = hs.get_directory_handler()
77        self.identity_handler = hs.get_identity_handler()
78        self.registration_handler = hs.get_registration_handler()
79        self.profile_handler = hs.get_profile_handler()
80        self.event_creation_handler = hs.get_event_creation_handler()
81        self.account_data_handler = hs.get_account_data_handler()
82        self.event_auth_handler = hs.get_event_auth_handler()
83
84        self.member_linearizer: Linearizer = Linearizer(name="member")
85
86        self.clock = hs.get_clock()
87        self.spam_checker = hs.get_spam_checker()
88        self.third_party_event_rules = hs.get_third_party_event_rules()
89        self._server_notices_mxid = self.config.servernotices.server_notices_mxid
90        self._enable_lookup = hs.config.registration.enable_3pid_lookup
91        self.allow_per_room_profiles = self.config.server.allow_per_room_profiles
92
93        self._join_rate_limiter_local = Ratelimiter(
94            store=self.store,
95            clock=self.clock,
96            rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
97            burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
98        )
99        self._join_rate_limiter_remote = Ratelimiter(
100            store=self.store,
101            clock=self.clock,
102            rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
103            burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
104        )
105
106        self._invites_per_room_limiter = Ratelimiter(
107            store=self.store,
108            clock=self.clock,
109            rate_hz=hs.config.ratelimiting.rc_invites_per_room.per_second,
110            burst_count=hs.config.ratelimiting.rc_invites_per_room.burst_count,
111        )
112        self._invites_per_user_limiter = Ratelimiter(
113            store=self.store,
114            clock=self.clock,
115            rate_hz=hs.config.ratelimiting.rc_invites_per_user.per_second,
116            burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count,
117        )
118
119        self.request_ratelimiter = hs.get_request_ratelimiter()
120
121    @abc.abstractmethod
122    async def _remote_join(
123        self,
124        requester: Requester,
125        remote_room_hosts: List[str],
126        room_id: str,
127        user: UserID,
128        content: dict,
129    ) -> Tuple[str, int]:
130        """Try and join a room that this server is not in
131
132        Args:
133            requester
134            remote_room_hosts: List of servers that can be used to join via.
135            room_id: Room that we are trying to join
136            user: User who is trying to join
137            content: A dict that should be used as the content of the join event.
138        """
139        raise NotImplementedError()
140
141    @abc.abstractmethod
142    async def remote_knock(
143        self,
144        remote_room_hosts: List[str],
145        room_id: str,
146        user: UserID,
147        content: dict,
148    ) -> Tuple[str, int]:
149        """Try and knock on a room that this server is not in
150
151        Args:
152            remote_room_hosts: List of servers that can be used to knock via.
153            room_id: Room that we are trying to knock on.
154            user: User who is trying to knock.
155            content: A dict that should be used as the content of the knock event.
156        """
157        raise NotImplementedError()
158
159    @abc.abstractmethod
160    async def remote_reject_invite(
161        self,
162        invite_event_id: str,
163        txn_id: Optional[str],
164        requester: Requester,
165        content: JsonDict,
166    ) -> Tuple[str, int]:
167        """
168        Rejects an out-of-band invite we have received from a remote server
169
170        Args:
171            invite_event_id: ID of the invite to be rejected
172            txn_id: optional transaction ID supplied by the client
173            requester: user making the rejection request, according to the access token
174            content: additional content to include in the rejection event.
175               Normally an empty dict.
176
177        Returns:
178            event id, stream_id of the leave event
179        """
180        raise NotImplementedError()
181
182    @abc.abstractmethod
183    async def remote_rescind_knock(
184        self,
185        knock_event_id: str,
186        txn_id: Optional[str],
187        requester: Requester,
188        content: JsonDict,
189    ) -> Tuple[str, int]:
190        """Rescind a local knock made on a remote room.
191
192        Args:
193            knock_event_id: The ID of the knock event to rescind.
194            txn_id: An optional transaction ID supplied by the client.
195            requester: The user making the request, according to the access token.
196            content: The content of the generated leave event.
197
198        Returns:
199            A tuple containing (event_id, stream_id of the leave event).
200        """
201        raise NotImplementedError()
202
203    @abc.abstractmethod
204    async def _user_left_room(self, target: UserID, room_id: str) -> None:
205        """Notifies distributor on master process that the user has left the
206        room.
207
208        Args:
209            target
210            room_id
211        """
212        raise NotImplementedError()
213
214    @abc.abstractmethod
215    async def forget(self, user: UserID, room_id: str) -> None:
216        raise NotImplementedError()
217
218    async def ratelimit_multiple_invites(
219        self,
220        requester: Optional[Requester],
221        room_id: Optional[str],
222        n_invites: int,
223        update: bool = True,
224    ) -> None:
225        """Ratelimit more than one invite sent by the given requester in the given room.
226
227        Args:
228            requester: The requester sending the invites.
229            room_id: The room the invites are being sent in.
230            n_invites: The amount of invites to ratelimit for.
231            update: Whether to update the ratelimiter's cache.
232
233        Raises:
234            LimitExceededError: The requester can't send that many invites in the room.
235        """
236        await self._invites_per_room_limiter.ratelimit(
237            requester,
238            room_id,
239            update=update,
240            n_actions=n_invites,
241        )
242
243    async def ratelimit_invite(
244        self,
245        requester: Optional[Requester],
246        room_id: Optional[str],
247        invitee_user_id: str,
248    ) -> None:
249        """Ratelimit invites by room and by target user.
250
251        If room ID is missing then we just rate limit by target user.
252        """
253        if room_id:
254            await self._invites_per_room_limiter.ratelimit(requester, room_id)
255
256        await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id)
257
258    async def _local_membership_update(
259        self,
260        requester: Requester,
261        target: UserID,
262        room_id: str,
263        membership: str,
264        prev_event_ids: List[str],
265        auth_event_ids: Optional[List[str]] = None,
266        txn_id: Optional[str] = None,
267        ratelimit: bool = True,
268        content: Optional[dict] = None,
269        require_consent: bool = True,
270        outlier: bool = False,
271        historical: bool = False,
272    ) -> Tuple[str, int]:
273        """
274        Internal membership update function to get an existing event or create
275        and persist a new event for the new membership change.
276
277        Args:
278            requester:
279            target:
280            room_id:
281            membership:
282            prev_event_ids: The event IDs to use as the prev events
283
284            auth_event_ids:
285                The event ids to use as the auth_events for the new event.
286                Should normally be left as None, which will cause them to be calculated
287                based on the room state at the prev_events.
288
289            txn_id:
290            ratelimit:
291            content:
292            require_consent:
293
294            outlier: Indicates whether the event is an `outlier`, i.e. if
295                it's from an arbitrary point and floating in the DAG as
296                opposed to being inline with the current DAG.
297            historical: Indicates whether the message is being inserted
298                back in time around some existing events. This is used to skip
299                a few checks and mark the event as backfilled.
300
301        Returns:
302            Tuple of event ID and stream ordering position
303        """
304
305        user_id = target.to_string()
306
307        if content is None:
308            content = {}
309
310        content["membership"] = membership
311        if requester.is_guest:
312            content["kind"] = "guest"
313
314        # Check if we already have an event with a matching transaction ID. (We
315        # do this check just before we persist an event as well, but may as well
316        # do it up front for efficiency.)
317        if txn_id and requester.access_token_id:
318            existing_event_id = await self.store.get_event_id_from_transaction_id(
319                room_id,
320                requester.user.to_string(),
321                requester.access_token_id,
322                txn_id,
323            )
324            if existing_event_id:
325                event_pos = await self.store.get_position_for_event(existing_event_id)
326                return existing_event_id, event_pos.stream
327
328        event, context = await self.event_creation_handler.create_event(
329            requester,
330            {
331                "type": EventTypes.Member,
332                "content": content,
333                "room_id": room_id,
334                "sender": requester.user.to_string(),
335                "state_key": user_id,
336                # For backwards compatibility:
337                "membership": membership,
338            },
339            txn_id=txn_id,
340            prev_event_ids=prev_event_ids,
341            auth_event_ids=auth_event_ids,
342            require_consent=require_consent,
343            outlier=outlier,
344            historical=historical,
345        )
346
347        prev_state_ids = await context.get_prev_state_ids()
348
349        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
350
351        if event.membership == Membership.JOIN:
352            newly_joined = True
353            if prev_member_event_id:
354                prev_member_event = await self.store.get_event(prev_member_event_id)
355                newly_joined = prev_member_event.membership != Membership.JOIN
356
357            # Only rate-limit if the user actually joined the room, otherwise we'll end
358            # up blocking profile updates.
359            if newly_joined and ratelimit:
360                time_now_s = self.clock.time()
361                (
362                    allowed,
363                    time_allowed,
364                ) = await self._join_rate_limiter_local.can_do_action(requester)
365
366                if not allowed:
367                    raise LimitExceededError(
368                        retry_after_ms=int(1000 * (time_allowed - time_now_s))
369                    )
370
371        result_event = await self.event_creation_handler.handle_new_client_event(
372            requester,
373            event,
374            context,
375            extra_users=[target],
376            ratelimit=ratelimit,
377        )
378
379        if event.membership == Membership.LEAVE:
380            if prev_member_event_id:
381                prev_member_event = await self.store.get_event(prev_member_event_id)
382                if prev_member_event.membership == Membership.JOIN:
383                    await self._user_left_room(target, room_id)
384
385        # we know it was persisted, so should have a stream ordering
386        assert result_event.internal_metadata.stream_ordering
387        return result_event.event_id, result_event.internal_metadata.stream_ordering
388
389    async def copy_room_tags_and_direct_to_room(
390        self, old_room_id: str, new_room_id: str, user_id: str
391    ) -> None:
392        """Copies the tags and direct room state from one room to another.
393
394        Args:
395            old_room_id: The room ID of the old room.
396            new_room_id: The room ID of the new room.
397            user_id: The user's ID.
398        """
399        # Retrieve user account data for predecessor room
400        user_account_data, _ = await self.store.get_account_data_for_user(user_id)
401
402        # Copy direct message state if applicable
403        direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
404
405        # Check which key this room is under
406        if isinstance(direct_rooms, dict):
407            for key, room_id_list in direct_rooms.items():
408                if old_room_id in room_id_list and new_room_id not in room_id_list:
409                    # Add new room_id to this key
410                    direct_rooms[key].append(new_room_id)
411
412                    # Save back to user's m.direct account data
413                    await self.account_data_handler.add_account_data_for_user(
414                        user_id, AccountDataTypes.DIRECT, direct_rooms
415                    )
416                    break
417
418        # Copy room tags if applicable
419        room_tags = await self.store.get_tags_for_room(user_id, old_room_id)
420
421        # Copy each room tag to the new room
422        for tag, tag_content in room_tags.items():
423            await self.account_data_handler.add_tag_to_room(
424                user_id, new_room_id, tag, tag_content
425            )
426
427    async def update_membership(
428        self,
429        requester: Requester,
430        target: UserID,
431        room_id: str,
432        action: str,
433        txn_id: Optional[str] = None,
434        remote_room_hosts: Optional[List[str]] = None,
435        third_party_signed: Optional[dict] = None,
436        ratelimit: bool = True,
437        content: Optional[dict] = None,
438        new_room: bool = False,
439        require_consent: bool = True,
440        outlier: bool = False,
441        historical: bool = False,
442        prev_event_ids: Optional[List[str]] = None,
443        auth_event_ids: Optional[List[str]] = None,
444    ) -> Tuple[str, int]:
445        """Update a user's membership in a room.
446
447        Params:
448            requester: The user who is performing the update.
449            target: The user whose membership is being updated.
450            room_id: The room ID whose membership is being updated.
451            action: The membership change, see synapse.api.constants.Membership.
452            txn_id: The transaction ID, if given.
453            remote_room_hosts: Remote servers to send the update to.
454            third_party_signed: Information from a 3PID invite.
455            ratelimit: Whether to rate limit the request.
456            content: The content of the created event.
457            new_room: Whether the membership update is happening in the context of a room
458                creation.
459            require_consent: Whether consent is required.
460            outlier: Indicates whether the event is an `outlier`, i.e. if
461                it's from an arbitrary point and floating in the DAG as
462                opposed to being inline with the current DAG.
463            historical: Indicates whether the message is being inserted
464                back in time around some existing events. This is used to skip
465                a few checks and mark the event as backfilled.
466            prev_event_ids: The event IDs to use as the prev events
467            auth_event_ids:
468                The event ids to use as the auth_events for the new event.
469                Should normally be left as None, which will cause them to be calculated
470                based on the room state at the prev_events.
471
472        Returns:
473            A tuple of the new event ID and stream ID.
474
475        Raises:
476            ShadowBanError if a shadow-banned requester attempts to send an invite.
477        """
478        if action == Membership.INVITE and requester.shadow_banned:
479            # We randomly sleep a bit just to annoy the requester.
480            await self.clock.sleep(random.randint(1, 10))
481            raise ShadowBanError()
482
483        key = (room_id,)
484
485        with (await self.member_linearizer.queue(key)):
486            result = await self.update_membership_locked(
487                requester,
488                target,
489                room_id,
490                action,
491                txn_id=txn_id,
492                remote_room_hosts=remote_room_hosts,
493                third_party_signed=third_party_signed,
494                ratelimit=ratelimit,
495                content=content,
496                new_room=new_room,
497                require_consent=require_consent,
498                outlier=outlier,
499                historical=historical,
500                prev_event_ids=prev_event_ids,
501                auth_event_ids=auth_event_ids,
502            )
503
504        return result
505
506    async def update_membership_locked(
507        self,
508        requester: Requester,
509        target: UserID,
510        room_id: str,
511        action: str,
512        txn_id: Optional[str] = None,
513        remote_room_hosts: Optional[List[str]] = None,
514        third_party_signed: Optional[dict] = None,
515        ratelimit: bool = True,
516        content: Optional[dict] = None,
517        new_room: bool = False,
518        require_consent: bool = True,
519        outlier: bool = False,
520        historical: bool = False,
521        prev_event_ids: Optional[List[str]] = None,
522        auth_event_ids: Optional[List[str]] = None,
523    ) -> Tuple[str, int]:
524        """Helper for update_membership.
525
526        Assumes that the membership linearizer is already held for the room.
527
528        Args:
529            requester:
530            target:
531            room_id:
532            action:
533            txn_id:
534            remote_room_hosts:
535            third_party_signed:
536            ratelimit:
537            content:
538            new_room: Whether the membership update is happening in the context of a room
539                creation.
540            require_consent:
541            outlier: Indicates whether the event is an `outlier`, i.e. if
542                it's from an arbitrary point and floating in the DAG as
543                opposed to being inline with the current DAG.
544            historical: Indicates whether the message is being inserted
545                back in time around some existing events. This is used to skip
546                a few checks and mark the event as backfilled.
547            prev_event_ids: The event IDs to use as the prev events
548            auth_event_ids:
549                The event ids to use as the auth_events for the new event.
550                Should normally be left as None, which will cause them to be calculated
551                based on the room state at the prev_events.
552
553        Returns:
554            A tuple of the new event ID and stream ID.
555        """
556        content_specified = bool(content)
557        if content is None:
558            content = {}
559        else:
560            # We do a copy here as we potentially change some keys
561            # later on.
562            content = dict(content)
563
564        # allow the server notices mxid to set room-level profile
565        is_requester_server_notices_user = (
566            self._server_notices_mxid is not None
567            and requester.user.to_string() == self._server_notices_mxid
568        )
569
570        if (
571            not self.allow_per_room_profiles and not is_requester_server_notices_user
572        ) or requester.shadow_banned:
573            # Strip profile data, knowing that new profile data will be added to the
574            # event's content in event_creation_handler.create_event() using the target's
575            # global profile.
576            content.pop("displayname", None)
577            content.pop("avatar_url", None)
578
579        if len(content.get("displayname") or "") > MAX_DISPLAYNAME_LEN:
580            raise SynapseError(
581                400,
582                f"Displayname is too long (max {MAX_DISPLAYNAME_LEN})",
583                errcode=Codes.BAD_JSON,
584            )
585
586        if len(content.get("avatar_url") or "") > MAX_AVATAR_URL_LEN:
587            raise SynapseError(
588                400,
589                f"Avatar URL is too long (max {MAX_AVATAR_URL_LEN})",
590                errcode=Codes.BAD_JSON,
591            )
592
593        # The event content should *not* include the authorising user as
594        # it won't be properly signed. Strip it out since it might come
595        # back from a client updating a display name / avatar.
596        #
597        # This only applies to restricted rooms, but there should be no reason
598        # for a client to include it. Unconditionally remove it.
599        content.pop(EventContentFields.AUTHORISING_USER, None)
600
601        effective_membership_state = action
602        if action in ["kick", "unban"]:
603            effective_membership_state = "leave"
604
605        # if this is a join with a 3pid signature, we may need to turn a 3pid
606        # invite into a normal invite before we can handle the join.
607        if third_party_signed is not None:
608            await self.federation_handler.exchange_third_party_invite(
609                third_party_signed["sender"],
610                target.to_string(),
611                room_id,
612                third_party_signed,
613            )
614
615        if not remote_room_hosts:
616            remote_room_hosts = []
617
618        if effective_membership_state not in ("leave", "ban"):
619            is_blocked = await self.store.is_room_blocked(room_id)
620            if is_blocked:
621                raise SynapseError(403, "This room has been blocked on this server")
622
623        if effective_membership_state == Membership.INVITE:
624            target_id = target.to_string()
625            if ratelimit:
626                await self.ratelimit_invite(requester, room_id, target_id)
627
628            # block any attempts to invite the server notices mxid
629            if target_id == self._server_notices_mxid:
630                raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
631
632            block_invite = False
633
634            if (
635                self._server_notices_mxid is not None
636                and requester.user.to_string() == self._server_notices_mxid
637            ):
638                # allow the server notices mxid to send invites
639                is_requester_admin = True
640
641            else:
642                is_requester_admin = await self.auth.is_server_admin(requester.user)
643
644            if not is_requester_admin:
645                if self.config.server.block_non_admin_invites:
646                    logger.info(
647                        "Blocking invite: user is not admin and non-admin "
648                        "invites disabled"
649                    )
650                    block_invite = True
651
652                if not await self.spam_checker.user_may_invite(
653                    requester.user.to_string(), target_id, room_id
654                ):
655                    logger.info("Blocking invite due to spam checker")
656                    block_invite = True
657
658            if block_invite:
659                raise SynapseError(403, "Invites have been disabled on this server")
660
661        # An empty prev_events list is allowed as long as the auth_event_ids are present
662        if prev_event_ids is not None:
663            return await self._local_membership_update(
664                requester=requester,
665                target=target,
666                room_id=room_id,
667                membership=effective_membership_state,
668                txn_id=txn_id,
669                ratelimit=ratelimit,
670                prev_event_ids=prev_event_ids,
671                auth_event_ids=auth_event_ids,
672                content=content,
673                require_consent=require_consent,
674                outlier=outlier,
675                historical=historical,
676            )
677
678        latest_event_ids = await self.store.get_prev_events_for_room(room_id)
679
680        current_state_ids = await self.state_handler.get_current_state_ids(
681            room_id, latest_event_ids=latest_event_ids
682        )
683
684        # TODO: Refactor into dictionary of explicitly allowed transitions
685        # between old and new state, with specific error messages for some
686        # transitions and generic otherwise
687        old_state_id = current_state_ids.get((EventTypes.Member, target.to_string()))
688        if old_state_id:
689            old_state = await self.store.get_event(old_state_id, allow_none=True)
690            old_membership = old_state.content.get("membership") if old_state else None
691            if action == "unban" and old_membership != "ban":
692                raise SynapseError(
693                    403,
694                    "Cannot unban user who was not banned"
695                    " (membership=%s)" % old_membership,
696                    errcode=Codes.BAD_STATE,
697                )
698            if old_membership == "ban" and action not in ["ban", "unban", "leave"]:
699                raise SynapseError(
700                    403,
701                    "Cannot %s user who was banned" % (action,),
702                    errcode=Codes.BAD_STATE,
703                )
704
705            if old_state:
706                same_content = content == old_state.content
707                same_membership = old_membership == effective_membership_state
708                same_sender = requester.user.to_string() == old_state.sender
709                if same_sender and same_membership and same_content:
710                    # duplicate event.
711                    # we know it was persisted, so must have a stream ordering.
712                    assert old_state.internal_metadata.stream_ordering
713                    return (
714                        old_state.event_id,
715                        old_state.internal_metadata.stream_ordering,
716                    )
717
718            if old_membership in ["ban", "leave"] and action == "kick":
719                raise AuthError(403, "The target user is not in the room")
720
721            # we don't allow people to reject invites to the server notice
722            # room, but they can leave it once they are joined.
723            if (
724                old_membership == Membership.INVITE
725                and effective_membership_state == Membership.LEAVE
726            ):
727                is_blocked = await self._is_server_notice_room(room_id)
728                if is_blocked:
729                    raise SynapseError(
730                        HTTPStatus.FORBIDDEN,
731                        "You cannot reject this invite",
732                        errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
733                    )
734        else:
735            if action == "kick":
736                raise AuthError(403, "The target user is not in the room")
737
738        is_host_in_room = await self._is_host_in_room(current_state_ids)
739
740        if effective_membership_state == Membership.JOIN:
741            if requester.is_guest:
742                guest_can_join = await self._can_guest_join(current_state_ids)
743                if not guest_can_join:
744                    # This should be an auth check, but guests are a local concept,
745                    # so don't really fit into the general auth process.
746                    raise AuthError(403, "Guest access not allowed")
747
748            # Figure out whether the user is a server admin to determine whether they
749            # should be able to bypass the spam checker.
750            if (
751                self._server_notices_mxid is not None
752                and requester.user.to_string() == self._server_notices_mxid
753            ):
754                # allow the server notices mxid to join rooms
755                bypass_spam_checker = True
756
757            else:
758                bypass_spam_checker = await self.auth.is_server_admin(requester.user)
759
760            inviter = await self._get_inviter(target.to_string(), room_id)
761            if (
762                not bypass_spam_checker
763                # We assume that if the spam checker allowed the user to create
764                # a room then they're allowed to join it.
765                and not new_room
766                and not await self.spam_checker.user_may_join_room(
767                    target.to_string(), room_id, is_invited=inviter is not None
768                )
769            ):
770                raise SynapseError(403, "Not allowed to join this room")
771
772            # Check if a remote join should be performed.
773            remote_join, remote_room_hosts = await self._should_perform_remote_join(
774                target.to_string(), room_id, remote_room_hosts, content, is_host_in_room
775            )
776            if remote_join:
777                if ratelimit:
778                    time_now_s = self.clock.time()
779                    (
780                        allowed,
781                        time_allowed,
782                    ) = await self._join_rate_limiter_remote.can_do_action(
783                        requester,
784                    )
785
786                    if not allowed:
787                        raise LimitExceededError(
788                            retry_after_ms=int(1000 * (time_allowed - time_now_s))
789                        )
790
791                inviter = await self._get_inviter(target.to_string(), room_id)
792                if inviter and not self.hs.is_mine(inviter):
793                    remote_room_hosts.append(inviter.domain)
794
795                content["membership"] = Membership.JOIN
796
797                profile = self.profile_handler
798                if not content_specified:
799                    content["displayname"] = await profile.get_displayname(target)
800                    content["avatar_url"] = await profile.get_avatar_url(target)
801
802                if requester.is_guest:
803                    content["kind"] = "guest"
804
805                remote_join_response = await self._remote_join(
806                    requester, remote_room_hosts, room_id, target, content
807                )
808
809                return remote_join_response
810
811        elif effective_membership_state == Membership.LEAVE:
812            if not is_host_in_room:
813                # Figure out the user's current membership state for the room
814                (
815                    current_membership_type,
816                    current_membership_event_id,
817                ) = await self.store.get_local_current_membership_for_user_in_room(
818                    target.to_string(), room_id
819                )
820                if not current_membership_type or not current_membership_event_id:
821                    logger.info(
822                        "%s sent a leave request to %s, but that is not an active room "
823                        "on this server, or there is no pending invite or knock",
824                        target,
825                        room_id,
826                    )
827
828                    raise SynapseError(404, "Not a known room")
829
830                # perhaps we've been invited
831                if current_membership_type == Membership.INVITE:
832                    invite = await self.store.get_event(current_membership_event_id)
833                    logger.info(
834                        "%s rejects invite to %s from %s",
835                        target,
836                        room_id,
837                        invite.sender,
838                    )
839
840                    if not self.hs.is_mine_id(invite.sender):
841                        # send the rejection to the inviter's HS (with fallback to
842                        # local event)
843                        return await self.remote_reject_invite(
844                            invite.event_id,
845                            txn_id,
846                            requester,
847                            content,
848                        )
849
850                    # the inviter was on our server, but has now left. Carry on
851                    # with the normal rejection codepath, which will also send the
852                    # rejection out to any other servers we believe are still in the room.
853
854                    # thanks to overzealous cleaning up of event_forward_extremities in
855                    # `delete_old_current_state_events`, it's possible to end up with no
856                    # forward extremities here. If that happens, let's just hang the
857                    # rejection off the invite event.
858                    #
859                    # see: https://github.com/matrix-org/synapse/issues/7139
860                    if len(latest_event_ids) == 0:
861                        latest_event_ids = [invite.event_id]
862
863                # or perhaps this is a remote room that a local user has knocked on
864                elif current_membership_type == Membership.KNOCK:
865                    knock = await self.store.get_event(current_membership_event_id)
866                    return await self.remote_rescind_knock(
867                        knock.event_id, txn_id, requester, content
868                    )
869
870        elif effective_membership_state == Membership.KNOCK:
871            if not is_host_in_room:
872                # The knock needs to be sent over federation instead
873                remote_room_hosts.append(get_domain_from_id(room_id))
874
875                content["membership"] = Membership.KNOCK
876
877                profile = self.profile_handler
878                if "displayname" not in content:
879                    content["displayname"] = await profile.get_displayname(target)
880                if "avatar_url" not in content:
881                    content["avatar_url"] = await profile.get_avatar_url(target)
882
883                return await self.remote_knock(
884                    remote_room_hosts, room_id, target, content
885                )
886
887        return await self._local_membership_update(
888            requester=requester,
889            target=target,
890            room_id=room_id,
891            membership=effective_membership_state,
892            txn_id=txn_id,
893            ratelimit=ratelimit,
894            prev_event_ids=latest_event_ids,
895            auth_event_ids=auth_event_ids,
896            content=content,
897            require_consent=require_consent,
898            outlier=outlier,
899        )
900
901    async def _should_perform_remote_join(
902        self,
903        user_id: str,
904        room_id: str,
905        remote_room_hosts: List[str],
906        content: JsonDict,
907        is_host_in_room: bool,
908    ) -> Tuple[bool, List[str]]:
909        """
910        Check whether the server should do a remote join (as opposed to a local
911        join) for a user.
912
913        Generally a remote join is used if:
914
915        * The server is not yet in the room.
916        * The server is in the room, the room has restricted join rules, the user
917          is not joined or invited to the room, and the server does not have
918          another user who is capable of issuing invites.
919
920        Args:
921            user_id: The user joining the room.
922            room_id: The room being joined.
923            remote_room_hosts: A list of remote room hosts.
924            content: The content to use as the event body of the join. This may
925                be modified.
926            is_host_in_room: True if the host is in the room.
927
928        Returns:
929            A tuple of:
930                True if a remote join should be performed. False if the join can be
931                done locally.
932
933                A list of remote room hosts to use. This is an empty list if a
934                local join is to be done.
935        """
936        # If the host isn't in the room, pass through the prospective hosts.
937        if not is_host_in_room:
938            return True, remote_room_hosts
939
940        # If the host is in the room, but not one of the authorised hosts
941        # for restricted join rules, a remote join must be used.
942        room_version = await self.store.get_room_version(room_id)
943        current_state_ids = await self.store.get_current_state_ids(room_id)
944
945        # If restricted join rules are not being used, a local join can always
946        # be used.
947        if not await self.event_auth_handler.has_restricted_join_rules(
948            current_state_ids, room_version
949        ):
950            return False, []
951
952        # If the user is invited to the room or already joined, the join
953        # event can always be issued locally.
954        prev_member_event_id = current_state_ids.get((EventTypes.Member, user_id), None)
955        prev_member_event = None
956        if prev_member_event_id:
957            prev_member_event = await self.store.get_event(prev_member_event_id)
958            if prev_member_event.membership in (
959                Membership.JOIN,
960                Membership.INVITE,
961            ):
962                return False, []
963
964        # If the local host has a user who can issue invites, then a local
965        # join can be done.
966        #
967        # If not, generate a new list of remote hosts based on which
968        # can issue invites.
969        event_map = await self.store.get_events(current_state_ids.values())
970        current_state = {
971            state_key: event_map[event_id]
972            for state_key, event_id in current_state_ids.items()
973        }
974        allowed_servers = get_servers_from_users(
975            get_users_which_can_issue_invite(current_state)
976        )
977
978        # If the local server is not one of allowed servers, then a remote
979        # join must be done. Return the list of prospective servers based on
980        # which can issue invites.
981        if self.hs.hostname not in allowed_servers:
982            return True, list(allowed_servers)
983
984        # Ensure the member should be allowed access via membership in a room.
985        await self.event_auth_handler.check_restricted_join_rules(
986            current_state_ids, room_version, user_id, prev_member_event
987        )
988
989        # If this is going to be a local join, additional information must
990        # be included in the event content in order to efficiently validate
991        # the event.
992        content[
993            EventContentFields.AUTHORISING_USER
994        ] = await self.event_auth_handler.get_user_which_could_invite(
995            room_id,
996            current_state_ids,
997        )
998
999        return False, []
1000
1001    async def transfer_room_state_on_room_upgrade(
1002        self, old_room_id: str, room_id: str
1003    ) -> None:
1004        """Upon our server becoming aware of an upgraded room, either by upgrading a room
1005        ourselves or joining one, we can transfer over information from the previous room.
1006
1007        Copies user state (tags/push rules) for every local user that was in the old room, as
1008        well as migrating the room directory state.
1009
1010        Args:
1011            old_room_id: The ID of the old room
1012            room_id: The ID of the new room
1013        """
1014        logger.info("Transferring room state from %s to %s", old_room_id, room_id)
1015
1016        # Find all local users that were in the old room and copy over each user's state
1017        users = await self.store.get_users_in_room(old_room_id)
1018        await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
1019
1020        # Add new room to the room directory if the old room was there
1021        # Remove old room from the room directory
1022        old_room = await self.store.get_room(old_room_id)
1023        if old_room is not None and old_room["is_public"]:
1024            await self.store.set_room_is_public(old_room_id, False)
1025            await self.store.set_room_is_public(room_id, True)
1026
1027        # Transfer alias mappings in the room directory
1028        await self.store.update_aliases_for_room(old_room_id, room_id)
1029
1030        # Check if any groups we own contain the predecessor room
1031        local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
1032        for group_id in local_group_ids:
1033            # Add new the new room to those groups
1034            await self.store.add_room_to_group(
1035                group_id, room_id, old_room is not None and old_room["is_public"]
1036            )
1037
1038            # Remove the old room from those groups
1039            await self.store.remove_room_from_group(group_id, old_room_id)
1040
1041    async def copy_user_state_on_room_upgrade(
1042        self, old_room_id: str, new_room_id: str, user_ids: Iterable[str]
1043    ) -> None:
1044        """Copy user-specific information when they join a new room when that new room is the
1045        result of a room upgrade
1046
1047        Args:
1048            old_room_id: The ID of upgraded room
1049            new_room_id: The ID of the new room
1050            user_ids: User IDs to copy state for
1051        """
1052
1053        logger.debug(
1054            "Copying over room tags and push rules from %s to %s for users %s",
1055            old_room_id,
1056            new_room_id,
1057            user_ids,
1058        )
1059
1060        for user_id in user_ids:
1061            try:
1062                # It is an upgraded room. Copy over old tags
1063                await self.copy_room_tags_and_direct_to_room(
1064                    old_room_id, new_room_id, user_id
1065                )
1066                # Copy over push rules
1067                await self.store.copy_push_rules_from_room_to_room_for_user(
1068                    old_room_id, new_room_id, user_id
1069                )
1070            except Exception:
1071                logger.exception(
1072                    "Error copying tags and/or push rules from rooms %s to %s for user %s. "
1073                    "Skipping...",
1074                    old_room_id,
1075                    new_room_id,
1076                    user_id,
1077                )
1078                continue
1079
1080    async def send_membership_event(
1081        self,
1082        requester: Optional[Requester],
1083        event: EventBase,
1084        context: EventContext,
1085        ratelimit: bool = True,
1086    ) -> None:
1087        """
1088        Change the membership status of a user in a room.
1089
1090        Args:
1091            requester: The local user who requested the membership
1092                event. If None, certain checks, like whether this homeserver can
1093                act as the sender, will be skipped.
1094            event: The membership event.
1095            context: The context of the event.
1096            ratelimit: Whether to rate limit this request.
1097        Raises:
1098            SynapseError if there was a problem changing the membership.
1099        """
1100        target_user = UserID.from_string(event.state_key)
1101        room_id = event.room_id
1102
1103        if requester is not None:
1104            sender = UserID.from_string(event.sender)
1105            assert (
1106                sender == requester.user
1107            ), "Sender (%s) must be same as requester (%s)" % (sender, requester.user)
1108            assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
1109        else:
1110            requester = types.create_requester(target_user)
1111
1112        prev_state_ids = await context.get_prev_state_ids()
1113        if event.membership == Membership.JOIN:
1114            if requester.is_guest:
1115                guest_can_join = await self._can_guest_join(prev_state_ids)
1116                if not guest_can_join:
1117                    # This should be an auth check, but guests are a local concept,
1118                    # so don't really fit into the general auth process.
1119                    raise AuthError(403, "Guest access not allowed")
1120
1121        if event.membership not in (Membership.LEAVE, Membership.BAN):
1122            is_blocked = await self.store.is_room_blocked(room_id)
1123            if is_blocked:
1124                raise SynapseError(403, "This room has been blocked on this server")
1125
1126        event = await self.event_creation_handler.handle_new_client_event(
1127            requester, event, context, extra_users=[target_user], ratelimit=ratelimit
1128        )
1129
1130        prev_member_event_id = prev_state_ids.get(
1131            (EventTypes.Member, event.state_key), None
1132        )
1133
1134        if event.membership == Membership.LEAVE:
1135            if prev_member_event_id:
1136                prev_member_event = await self.store.get_event(prev_member_event_id)
1137                if prev_member_event.membership == Membership.JOIN:
1138                    await self._user_left_room(target_user, room_id)
1139
1140    async def _can_guest_join(self, current_state_ids: StateMap[str]) -> bool:
1141        """
1142        Returns whether a guest can join a room based on its current state.
1143        """
1144        guest_access_id = current_state_ids.get((EventTypes.GuestAccess, ""), None)
1145        if not guest_access_id:
1146            return False
1147
1148        guest_access = await self.store.get_event(guest_access_id)
1149
1150        return bool(
1151            guest_access
1152            and guest_access.content
1153            and guest_access.content.get(EventContentFields.GUEST_ACCESS)
1154            == GuestAccess.CAN_JOIN
1155        )
1156
1157    async def kick_guest_users(self, current_state: Iterable[EventBase]) -> None:
1158        """Kick any local guest users from the room.
1159
1160        This is called when the room state changes from guests allowed to not-allowed.
1161
1162        Params:
1163            current_state: the current state of the room. We will iterate this to look
1164               for guest users to kick.
1165        """
1166        for member_event in current_state:
1167            try:
1168                if member_event.type != EventTypes.Member:
1169                    continue
1170
1171                if not self.hs.is_mine_id(member_event.state_key):
1172                    continue
1173
1174                if member_event.content["membership"] not in {
1175                    Membership.JOIN,
1176                    Membership.INVITE,
1177                }:
1178                    continue
1179
1180                if (
1181                    "kind" not in member_event.content
1182                    or member_event.content["kind"] != "guest"
1183                ):
1184                    continue
1185
1186                # We make the user choose to leave, rather than have the
1187                # event-sender kick them. This is partially because we don't
1188                # need to worry about power levels, and partially because guest
1189                # users are a concept which doesn't hugely work over federation,
1190                # and having homeservers have their own users leave keeps more
1191                # of that decision-making and control local to the guest-having
1192                # homeserver.
1193                target_user = UserID.from_string(member_event.state_key)
1194                requester = create_requester(
1195                    target_user, is_guest=True, authenticated_entity=self._server_name
1196                )
1197                handler = self.hs.get_room_member_handler()
1198                await handler.update_membership(
1199                    requester,
1200                    target_user,
1201                    member_event.room_id,
1202                    "leave",
1203                    ratelimit=False,
1204                    require_consent=False,
1205                )
1206            except Exception as e:
1207                logger.exception("Error kicking guest user: %s" % (e,))
1208
1209    async def lookup_room_alias(
1210        self, room_alias: RoomAlias
1211    ) -> Tuple[RoomID, List[str]]:
1212        """
1213        Get the room ID associated with a room alias.
1214
1215        Args:
1216            room_alias: The alias to look up.
1217        Returns:
1218            A tuple of:
1219                The room ID as a RoomID object.
1220                Hosts likely to be participating in the room ([str]).
1221        Raises:
1222            SynapseError if room alias could not be found.
1223        """
1224        directory_handler = self.directory_handler
1225        mapping = await directory_handler.get_association(room_alias)
1226
1227        if not mapping:
1228            raise SynapseError(404, "No such room alias")
1229
1230        room_id = mapping["room_id"]
1231        servers = mapping["servers"]
1232
1233        # put the server which owns the alias at the front of the server list.
1234        if room_alias.domain in servers:
1235            servers.remove(room_alias.domain)
1236        servers.insert(0, room_alias.domain)
1237
1238        return RoomID.from_string(room_id), servers
1239
1240    async def _get_inviter(self, user_id: str, room_id: str) -> Optional[UserID]:
1241        invite = await self.store.get_invite_for_local_user_in_room(
1242            user_id=user_id, room_id=room_id
1243        )
1244        if invite:
1245            return UserID.from_string(invite.sender)
1246        return None
1247
1248    async def do_3pid_invite(
1249        self,
1250        room_id: str,
1251        inviter: UserID,
1252        medium: str,
1253        address: str,
1254        id_server: str,
1255        requester: Requester,
1256        txn_id: Optional[str],
1257        id_access_token: Optional[str] = None,
1258    ) -> int:
1259        """Invite a 3PID to a room.
1260
1261        Args:
1262            room_id: The room to invite the 3PID to.
1263            inviter: The user sending the invite.
1264            medium: The 3PID's medium.
1265            address: The 3PID's address.
1266            id_server: The identity server to use.
1267            requester: The user making the request.
1268            txn_id: The transaction ID this is part of, or None if this is not
1269                part of a transaction.
1270            id_access_token: The optional identity server access token.
1271
1272        Returns:
1273             The new stream ID.
1274
1275        Raises:
1276            ShadowBanError if the requester has been shadow-banned.
1277        """
1278        if self.config.server.block_non_admin_invites:
1279            is_requester_admin = await self.auth.is_server_admin(requester.user)
1280            if not is_requester_admin:
1281                raise SynapseError(
1282                    403, "Invites have been disabled on this server", Codes.FORBIDDEN
1283                )
1284
1285        if requester.shadow_banned:
1286            # We randomly sleep a bit just to annoy the requester.
1287            await self.clock.sleep(random.randint(1, 10))
1288            raise ShadowBanError()
1289
1290        # We need to rate limit *before* we send out any 3PID invites, so we
1291        # can't just rely on the standard ratelimiting of events.
1292        await self.request_ratelimiter.ratelimit(requester)
1293
1294        can_invite = await self.third_party_event_rules.check_threepid_can_be_invited(
1295            medium, address, room_id
1296        )
1297        if not can_invite:
1298            raise SynapseError(
1299                403,
1300                "This third-party identifier can not be invited in this room",
1301                Codes.FORBIDDEN,
1302            )
1303
1304        if not self._enable_lookup:
1305            raise SynapseError(
1306                403, "Looking up third-party identifiers is denied from this server"
1307            )
1308
1309        invitee = await self.identity_handler.lookup_3pid(
1310            id_server, medium, address, id_access_token
1311        )
1312
1313        if invitee:
1314            # Note that update_membership with an action of "invite" can raise
1315            # a ShadowBanError, but this was done above already.
1316            # We don't check the invite against the spamchecker(s) here (through
1317            # user_may_invite) because we'll do it further down the line anyway (in
1318            # update_membership_locked).
1319            _, stream_id = await self.update_membership(
1320                requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
1321            )
1322        else:
1323            # Check if the spamchecker(s) allow this invite to go through.
1324            if not await self.spam_checker.user_may_send_3pid_invite(
1325                inviter_userid=requester.user.to_string(),
1326                medium=medium,
1327                address=address,
1328                room_id=room_id,
1329            ):
1330                raise SynapseError(403, "Cannot send threepid invite")
1331
1332            stream_id = await self._make_and_store_3pid_invite(
1333                requester,
1334                id_server,
1335                medium,
1336                address,
1337                room_id,
1338                inviter,
1339                txn_id=txn_id,
1340                id_access_token=id_access_token,
1341            )
1342
1343        return stream_id
1344
1345    async def _make_and_store_3pid_invite(
1346        self,
1347        requester: Requester,
1348        id_server: str,
1349        medium: str,
1350        address: str,
1351        room_id: str,
1352        user: UserID,
1353        txn_id: Optional[str],
1354        id_access_token: Optional[str] = None,
1355    ) -> int:
1356        room_state = await self.state_handler.get_current_state(room_id)
1357
1358        inviter_display_name = ""
1359        inviter_avatar_url = ""
1360        member_event = room_state.get((EventTypes.Member, user.to_string()))
1361        if member_event:
1362            inviter_display_name = member_event.content.get("displayname", "")
1363            inviter_avatar_url = member_event.content.get("avatar_url", "")
1364
1365        # if user has no display name, default to their MXID
1366        if not inviter_display_name:
1367            inviter_display_name = user.to_string()
1368
1369        canonical_room_alias = ""
1370        canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
1371        if canonical_alias_event:
1372            canonical_room_alias = canonical_alias_event.content.get("alias", "")
1373
1374        room_name = ""
1375        room_name_event = room_state.get((EventTypes.Name, ""))
1376        if room_name_event:
1377            room_name = room_name_event.content.get("name", "")
1378
1379        room_type = None
1380        room_create_event = room_state.get((EventTypes.Create, ""))
1381        if room_create_event:
1382            room_type = room_create_event.content.get(EventContentFields.ROOM_TYPE)
1383
1384        room_join_rules = ""
1385        join_rules_event = room_state.get((EventTypes.JoinRules, ""))
1386        if join_rules_event:
1387            room_join_rules = join_rules_event.content.get("join_rule", "")
1388
1389        room_avatar_url = ""
1390        room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
1391        if room_avatar_event:
1392            room_avatar_url = room_avatar_event.content.get("url", "")
1393
1394        (
1395            token,
1396            public_keys,
1397            fallback_public_key,
1398            display_name,
1399        ) = await self.identity_handler.ask_id_server_for_third_party_invite(
1400            requester=requester,
1401            id_server=id_server,
1402            medium=medium,
1403            address=address,
1404            room_id=room_id,
1405            inviter_user_id=user.to_string(),
1406            room_alias=canonical_room_alias,
1407            room_avatar_url=room_avatar_url,
1408            room_join_rules=room_join_rules,
1409            room_name=room_name,
1410            room_type=room_type,
1411            inviter_display_name=inviter_display_name,
1412            inviter_avatar_url=inviter_avatar_url,
1413            id_access_token=id_access_token,
1414        )
1415
1416        (
1417            event,
1418            stream_id,
1419        ) = await self.event_creation_handler.create_and_send_nonmember_event(
1420            requester,
1421            {
1422                "type": EventTypes.ThirdPartyInvite,
1423                "content": {
1424                    "display_name": display_name,
1425                    "public_keys": public_keys,
1426                    # For backwards compatibility:
1427                    "key_validity_url": fallback_public_key["key_validity_url"],
1428                    "public_key": fallback_public_key["public_key"],
1429                },
1430                "room_id": room_id,
1431                "sender": user.to_string(),
1432                "state_key": token,
1433            },
1434            ratelimit=False,
1435            txn_id=txn_id,
1436        )
1437        return stream_id
1438
1439    async def _is_host_in_room(self, current_state_ids: StateMap[str]) -> bool:
1440        # Have we just created the room, and is this about to be the very
1441        # first member event?
1442        create_event_id = current_state_ids.get(("m.room.create", ""))
1443        if len(current_state_ids) == 1 and create_event_id:
1444            # We can only get here if we're in the process of creating the room
1445            return True
1446
1447        for etype, state_key in current_state_ids:
1448            if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
1449                continue
1450
1451            event_id = current_state_ids[(etype, state_key)]
1452            event = await self.store.get_event(event_id, allow_none=True)
1453            if not event:
1454                continue
1455
1456            if event.membership == Membership.JOIN:
1457                return True
1458
1459        return False
1460
1461    async def _is_server_notice_room(self, room_id: str) -> bool:
1462        if self._server_notices_mxid is None:
1463            return False
1464        user_ids = await self.store.get_users_in_room(room_id)
1465        return self._server_notices_mxid in user_ids
1466
1467
1468class RoomMemberMasterHandler(RoomMemberHandler):
1469    def __init__(self, hs: "HomeServer"):
1470        super().__init__(hs)
1471
1472        self.distributor = hs.get_distributor()
1473        self.distributor.declare("user_left_room")
1474
1475    async def _is_remote_room_too_complex(
1476        self, room_id: str, remote_room_hosts: List[str]
1477    ) -> Optional[bool]:
1478        """
1479        Check if complexity of a remote room is too great.
1480
1481        Args:
1482            room_id
1483            remote_room_hosts
1484
1485        Returns: bool of whether the complexity is too great, or None
1486            if unable to be fetched
1487        """
1488        max_complexity = self.hs.config.server.limit_remote_rooms.complexity
1489        complexity = await self.federation_handler.get_room_complexity(
1490            remote_room_hosts, room_id
1491        )
1492
1493        if complexity:
1494            return complexity["v1"] > max_complexity
1495        return None
1496
1497    async def _is_local_room_too_complex(self, room_id: str) -> bool:
1498        """
1499        Check if the complexity of a local room is too great.
1500
1501        Args:
1502            room_id: The room ID to check for complexity.
1503        """
1504        max_complexity = self.hs.config.server.limit_remote_rooms.complexity
1505        complexity = await self.store.get_room_complexity(room_id)
1506
1507        return complexity["v1"] > max_complexity
1508
1509    async def _remote_join(
1510        self,
1511        requester: Requester,
1512        remote_room_hosts: List[str],
1513        room_id: str,
1514        user: UserID,
1515        content: dict,
1516    ) -> Tuple[str, int]:
1517        """Implements RoomMemberHandler._remote_join"""
1518        # filter ourselves out of remote_room_hosts: do_invite_join ignores it
1519        # and if it is the only entry we'd like to return a 404 rather than a
1520        # 500.
1521        remote_room_hosts = [
1522            host for host in remote_room_hosts if host != self.hs.hostname
1523        ]
1524
1525        if len(remote_room_hosts) == 0:
1526            raise SynapseError(404, "No known servers")
1527
1528        check_complexity = self.hs.config.server.limit_remote_rooms.enabled
1529        if (
1530            check_complexity
1531            and self.hs.config.server.limit_remote_rooms.admins_can_join
1532        ):
1533            check_complexity = not await self.auth.is_server_admin(user)
1534
1535        if check_complexity:
1536            # Fetch the room complexity
1537            too_complex = await self._is_remote_room_too_complex(
1538                room_id, remote_room_hosts
1539            )
1540            if too_complex is True:
1541                raise SynapseError(
1542                    code=400,
1543                    msg=self.hs.config.server.limit_remote_rooms.complexity_error,
1544                    errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
1545                )
1546
1547        # We don't do an auth check if we are doing an invite
1548        # join dance for now, since we're kinda implicitly checking
1549        # that we are allowed to join when we decide whether or not we
1550        # need to do the invite/join dance.
1551        event_id, stream_id = await self.federation_handler.do_invite_join(
1552            remote_room_hosts, room_id, user.to_string(), content
1553        )
1554
1555        # Check the room we just joined wasn't too large, if we didn't fetch the
1556        # complexity of it before.
1557        if check_complexity:
1558            if too_complex is False:
1559                # We checked, and we're under the limit.
1560                return event_id, stream_id
1561
1562            # Check again, but with the local state events
1563            too_complex = await self._is_local_room_too_complex(room_id)
1564
1565            if too_complex is False:
1566                # We're under the limit.
1567                return event_id, stream_id
1568
1569            # The room is too large. Leave.
1570            requester = types.create_requester(
1571                user, authenticated_entity=self._server_name
1572            )
1573            await self.update_membership(
1574                requester=requester, target=user, room_id=room_id, action="leave"
1575            )
1576            raise SynapseError(
1577                code=400,
1578                msg=self.hs.config.server.limit_remote_rooms.complexity_error,
1579                errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
1580            )
1581
1582        return event_id, stream_id
1583
1584    async def remote_reject_invite(
1585        self,
1586        invite_event_id: str,
1587        txn_id: Optional[str],
1588        requester: Requester,
1589        content: JsonDict,
1590    ) -> Tuple[str, int]:
1591        """
1592        Rejects an out-of-band invite received from a remote user
1593
1594        Implements RoomMemberHandler.remote_reject_invite
1595        """
1596        invite_event = await self.store.get_event(invite_event_id)
1597        room_id = invite_event.room_id
1598        target_user = invite_event.state_key
1599
1600        # first of all, try doing a rejection via the inviting server
1601        fed_handler = self.federation_handler
1602        try:
1603            inviter_id = UserID.from_string(invite_event.sender)
1604            event, stream_id = await fed_handler.do_remotely_reject_invite(
1605                [inviter_id.domain], room_id, target_user, content=content
1606            )
1607            return event.event_id, stream_id
1608        except Exception as e:
1609            # if we were unable to reject the invite, we will generate our own
1610            # leave event.
1611            #
1612            # The 'except' clause is very broad, but we need to
1613            # capture everything from DNS failures upwards
1614            #
1615            logger.warning("Failed to reject invite: %s", e)
1616
1617            return await self._generate_local_out_of_band_leave(
1618                invite_event, txn_id, requester, content
1619            )
1620
1621    async def remote_rescind_knock(
1622        self,
1623        knock_event_id: str,
1624        txn_id: Optional[str],
1625        requester: Requester,
1626        content: JsonDict,
1627    ) -> Tuple[str, int]:
1628        """
1629        Rescinds a local knock made on a remote room
1630
1631        Args:
1632            knock_event_id: The ID of the knock event to rescind.
1633            txn_id: The transaction ID to use.
1634            requester: The originator of the request.
1635            content: The content of the leave event.
1636
1637        Implements RoomMemberHandler.remote_rescind_knock
1638        """
1639        # TODO: We don't yet support rescinding knocks over federation
1640        # as we don't know which homeserver to send it to. An obvious
1641        # candidate is the remote homeserver we originally knocked through,
1642        # however we don't currently store that information.
1643
1644        # Just rescind the knock locally
1645        knock_event = await self.store.get_event(knock_event_id)
1646        return await self._generate_local_out_of_band_leave(
1647            knock_event, txn_id, requester, content
1648        )
1649
1650    async def _generate_local_out_of_band_leave(
1651        self,
1652        previous_membership_event: EventBase,
1653        txn_id: Optional[str],
1654        requester: Requester,
1655        content: JsonDict,
1656    ) -> Tuple[str, int]:
1657        """Generate a local leave event for a room
1658
1659        This can be called after we e.g fail to reject an invite via a remote server.
1660        It generates an out-of-band membership event locally.
1661
1662        Args:
1663            previous_membership_event: the previous membership event for this user
1664            txn_id: optional transaction ID supplied by the client
1665            requester: user making the request, according to the access token
1666            content: additional content to include in the leave event.
1667               Normally an empty dict.
1668
1669        Returns:
1670            A tuple containing (event_id, stream_id of the leave event)
1671        """
1672        room_id = previous_membership_event.room_id
1673        target_user = previous_membership_event.state_key
1674
1675        content["membership"] = Membership.LEAVE
1676
1677        event_dict = {
1678            "type": EventTypes.Member,
1679            "room_id": room_id,
1680            "sender": target_user,
1681            "content": content,
1682            "state_key": target_user,
1683        }
1684
1685        # the auth events for the new event are the same as that of the previous event, plus
1686        # the event itself.
1687        #
1688        # the prev_events consist solely of the previous membership event.
1689        prev_event_ids = [previous_membership_event.event_id]
1690        auth_event_ids = (
1691            list(previous_membership_event.auth_event_ids()) + prev_event_ids
1692        )
1693
1694        event, context = await self.event_creation_handler.create_event(
1695            requester,
1696            event_dict,
1697            txn_id=txn_id,
1698            prev_event_ids=prev_event_ids,
1699            auth_event_ids=auth_event_ids,
1700        )
1701        event.internal_metadata.outlier = True
1702        event.internal_metadata.out_of_band_membership = True
1703
1704        result_event = await self.event_creation_handler.handle_new_client_event(
1705            requester,
1706            event,
1707            context,
1708            extra_users=[UserID.from_string(target_user)],
1709        )
1710        # we know it was persisted, so must have a stream ordering
1711        assert result_event.internal_metadata.stream_ordering
1712
1713        return result_event.event_id, result_event.internal_metadata.stream_ordering
1714
1715    async def remote_knock(
1716        self,
1717        remote_room_hosts: List[str],
1718        room_id: str,
1719        user: UserID,
1720        content: dict,
1721    ) -> Tuple[str, int]:
1722        """Sends a knock to a room. Attempts to do so via one remote out of a given list.
1723
1724        Args:
1725            remote_room_hosts: A list of homeservers to try knocking through.
1726            room_id: The ID of the room to knock on.
1727            user: The user to knock on behalf of.
1728            content: The content of the knock event.
1729
1730        Returns:
1731            A tuple of (event ID, stream ID).
1732        """
1733        # filter ourselves out of remote_room_hosts
1734        remote_room_hosts = [
1735            host for host in remote_room_hosts if host != self.hs.hostname
1736        ]
1737
1738        if len(remote_room_hosts) == 0:
1739            raise SynapseError(404, "No known servers")
1740
1741        return await self.federation_handler.do_knock(
1742            remote_room_hosts, room_id, user.to_string(), content=content
1743        )
1744
1745    async def _user_left_room(self, target: UserID, room_id: str) -> None:
1746        """Implements RoomMemberHandler._user_left_room"""
1747        user_left_room(self.distributor, target, room_id)
1748
1749    async def forget(self, user: UserID, room_id: str) -> None:
1750        user_id = user.to_string()
1751
1752        member = await self.state_handler.get_current_state(
1753            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
1754        )
1755        membership = member.membership if member else None
1756
1757        if membership is not None and membership not in [
1758            Membership.LEAVE,
1759            Membership.BAN,
1760        ]:
1761            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
1762
1763        if membership:
1764            await self.store.forget(user_id, room_id)
1765
1766
1767def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
1768    """
1769    Return the list of users which can issue invites.
1770
1771    This is done by exploring the joined users and comparing their power levels
1772    to the necessyar power level to issue an invite.
1773
1774    Args:
1775        auth_events: state in force at this point in the room
1776
1777    Returns:
1778        The users which can issue invites.
1779    """
1780    invite_level = get_named_level(auth_events, "invite", 0)
1781    users_default_level = get_named_level(auth_events, "users_default", 0)
1782    power_level_event = get_power_level_event(auth_events)
1783
1784    # Custom power-levels for users.
1785    if power_level_event:
1786        users = power_level_event.content.get("users", {})
1787    else:
1788        users = {}
1789
1790    result = []
1791
1792    # Check which members are able to invite by ensuring they're joined and have
1793    # the necessary power level.
1794    for (event_type, state_key), event in auth_events.items():
1795        if event_type != EventTypes.Member:
1796            continue
1797
1798        if event.membership != Membership.JOIN:
1799            continue
1800
1801        # Check if the user has a custom power level.
1802        if users.get(state_key, users_default_level) >= invite_level:
1803            result.append(state_key)
1804
1805    return result
1806
1807
1808def get_servers_from_users(users: List[str]) -> Set[str]:
1809    """
1810    Resolve a list of users into their servers.
1811
1812    Args:
1813        users: A list of users.
1814
1815    Returns:
1816        A set of servers.
1817    """
1818    servers = set()
1819    for user in users:
1820        try:
1821            servers.add(get_domain_from_id(user))
1822        except SynapseError:
1823            pass
1824    return servers
1825