1# Copyright 2016 OpenMarket Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import re
17from typing import TYPE_CHECKING, Dict, Iterable, Optional
18
19from synapse.api.constants import EventTypes, Membership
20from synapse.events import EventBase
21from synapse.types import StateMap
22
23if TYPE_CHECKING:
24    from synapse.storage.databases.main import DataStore
25
26logger = logging.getLogger(__name__)
27
28# intentionally looser than what aliases we allow to be registered since
29# other HSes may allow aliases that we would not
30ALIAS_RE = re.compile(r"^#.*:.+$")
31
32ALL_ALONE = "Empty Room"
33
34
35async def calculate_room_name(
36    store: "DataStore",
37    room_state_ids: StateMap[str],
38    user_id: str,
39    fallback_to_members: bool = True,
40    fallback_to_single_member: bool = True,
41) -> Optional[str]:
42    """
43    Works out a user-facing name for the given room as per Matrix
44    spec recommendations.
45    Does not yet support internationalisation.
46    Args:
47        store: The data store to query.
48        room_state_ids: Dictionary of the room's state IDs.
49        user_id: The ID of the user to whom the room name is being presented
50        fallback_to_members: If False, return None instead of generating a name
51                             based on the room's members if the room has no
52                             title or aliases.
53        fallback_to_single_member: If False, return None instead of generating a
54            name based on the user who invited this user to the room if the room
55            has no title or aliases.
56
57    Returns:
58        A human readable name for the room, if possible.
59    """
60    # does it have a name?
61    if (EventTypes.Name, "") in room_state_ids:
62        m_room_name = await store.get_event(
63            room_state_ids[(EventTypes.Name, "")], allow_none=True
64        )
65        if m_room_name and m_room_name.content and m_room_name.content.get("name"):
66            return m_room_name.content["name"]
67
68    # does it have a canonical alias?
69    if (EventTypes.CanonicalAlias, "") in room_state_ids:
70        canon_alias = await store.get_event(
71            room_state_ids[(EventTypes.CanonicalAlias, "")], allow_none=True
72        )
73        if (
74            canon_alias
75            and canon_alias.content
76            and canon_alias.content.get("alias")
77            and _looks_like_an_alias(canon_alias.content["alias"])
78        ):
79            return canon_alias.content["alias"]
80
81    if not fallback_to_members:
82        return None
83
84    my_member_event = None
85    if (EventTypes.Member, user_id) in room_state_ids:
86        my_member_event = await store.get_event(
87            room_state_ids[(EventTypes.Member, user_id)], allow_none=True
88        )
89
90    if (
91        my_member_event is not None
92        and my_member_event.content.get("membership") == Membership.INVITE
93    ):
94        if (EventTypes.Member, my_member_event.sender) in room_state_ids:
95            inviter_member_event = await store.get_event(
96                room_state_ids[(EventTypes.Member, my_member_event.sender)],
97                allow_none=True,
98            )
99            if inviter_member_event:
100                if fallback_to_single_member:
101                    return "Invite from %s" % (
102                        name_from_member_event(inviter_member_event),
103                    )
104                else:
105                    return None
106        else:
107            return "Room Invite"
108
109    # at this point we're going to need to search the state by all state keys
110    # for an event type, so rearrange the data structure
111    room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
112
113    # we're going to have to generate a name based on who's in the room,
114    # so find out who is in the room that isn't the user.
115    if EventTypes.Member in room_state_bytype_ids:
116        member_events = await store.get_events(
117            list(room_state_bytype_ids[EventTypes.Member].values())
118        )
119        all_members = [
120            ev
121            for ev in member_events.values()
122            if ev.content.get("membership") == Membership.JOIN
123            or ev.content.get("membership") == Membership.INVITE
124        ]
125        # Sort the member events oldest-first so the we name people in the
126        # order the joined (it should at least be deterministic rather than
127        # dictionary iteration order)
128        all_members.sort(key=lambda e: e.origin_server_ts)
129        other_members = [m for m in all_members if m.state_key != user_id]
130    else:
131        other_members = []
132        all_members = []
133
134    if len(other_members) == 0:
135        if len(all_members) == 1:
136            # self-chat, peeked room with 1 participant,
137            # or inbound invite, or outbound 3PID invite.
138            if all_members[0].sender == user_id:
139                if EventTypes.ThirdPartyInvite in room_state_bytype_ids:
140                    third_party_invites = room_state_bytype_ids[
141                        EventTypes.ThirdPartyInvite
142                    ].values()
143
144                    if len(third_party_invites) > 0:
145                        # technically third party invite events are not member
146                        # events, but they are close enough
147
148                        # FIXME: no they're not - they look nothing like a member;
149                        # they have a great big encrypted thing as their name to
150                        # prevent leaking the 3PID name...
151                        # return "Inviting %s" % (
152                        #     descriptor_from_member_events(third_party_invites)
153                        # )
154                        return "Inviting email address"
155                    else:
156                        return ALL_ALONE
157            else:
158                return name_from_member_event(all_members[0])
159        else:
160            return ALL_ALONE
161    elif len(other_members) == 1 and not fallback_to_single_member:
162        return None
163
164    return descriptor_from_member_events(other_members)
165
166
167def descriptor_from_member_events(member_events: Iterable[EventBase]) -> str:
168    """Get a description of the room based on the member events.
169
170    Args:
171        member_events: The events of a room.
172
173    Returns:
174        The room description
175    """
176
177    member_events = list(member_events)
178
179    if len(member_events) == 0:
180        return "nobody"
181    elif len(member_events) == 1:
182        return name_from_member_event(member_events[0])
183    elif len(member_events) == 2:
184        return "%s and %s" % (
185            name_from_member_event(member_events[0]),
186            name_from_member_event(member_events[1]),
187        )
188    else:
189        return "%s and %d others" % (
190            name_from_member_event(member_events[0]),
191            len(member_events) - 1,
192        )
193
194
195def name_from_member_event(member_event: EventBase) -> str:
196    if member_event.content and member_event.content.get("displayname"):
197        return member_event.content["displayname"]
198    return member_event.state_key
199
200
201def _state_as_two_level_dict(state: StateMap[str]) -> Dict[str, Dict[str, str]]:
202    ret: Dict[str, Dict[str, str]] = {}
203    for k, v in state.items():
204        ret.setdefault(k[0], {})[k[1]] = v
205    return ret
206
207
208def _looks_like_an_alias(string: str) -> bool:
209    return ALIAS_RE.match(string) is not None
210