1# Copyright 2014 - 2016 OpenMarket Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16from typing import TYPE_CHECKING, Any, Optional, Tuple
17
18import attr
19import msgpack
20from unpaddedbase64 import decode_base64, encode_base64
21
22from synapse.api.constants import (
23    EventContentFields,
24    EventTypes,
25    GuestAccess,
26    HistoryVisibility,
27    JoinRules,
28)
29from synapse.api.errors import (
30    Codes,
31    HttpResponseException,
32    RequestSendFailed,
33    SynapseError,
34)
35from synapse.types import JsonDict, ThirdPartyInstanceID
36from synapse.util.caches.descriptors import _CacheContext, cached
37from synapse.util.caches.response_cache import ResponseCache
38
39if TYPE_CHECKING:
40    from synapse.server import HomeServer
41
42logger = logging.getLogger(__name__)
43
44REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
45
46# This is used to indicate we should only return rooms published to the main list.
47EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
48
49
50class RoomListHandler:
51    def __init__(self, hs: "HomeServer"):
52        self.store = hs.get_datastore()
53        self.hs = hs
54        self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
55        self.response_cache: ResponseCache[
56            Tuple[Optional[int], Optional[str], Optional[ThirdPartyInstanceID]]
57        ] = ResponseCache(hs.get_clock(), "room_list")
58        self.remote_response_cache: ResponseCache[
59            Tuple[str, Optional[int], Optional[str], bool, Optional[str]]
60        ] = ResponseCache(hs.get_clock(), "remote_room_list", timeout_ms=30 * 1000)
61
62    async def get_local_public_room_list(
63        self,
64        limit: Optional[int] = None,
65        since_token: Optional[str] = None,
66        search_filter: Optional[dict] = None,
67        network_tuple: Optional[ThirdPartyInstanceID] = EMPTY_THIRD_PARTY_ID,
68        from_federation: bool = False,
69    ) -> JsonDict:
70        """Generate a local public room list.
71
72        There are multiple different lists: the main one plus one per third
73        party network. A client can ask for a specific list or to return all.
74
75        Args:
76            limit
77            since_token
78            search_filter
79            network_tuple: Which public list to use.
80                This can be (None, None) to indicate the main list, or a particular
81                appservice and network id to use an appservice specific one.
82                Setting to None returns all public rooms across all lists.
83            from_federation: true iff the request comes from the federation API
84        """
85        if not self.enable_room_list_search:
86            return {"chunk": [], "total_room_count_estimate": 0}
87
88        logger.info(
89            "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
90            limit,
91            since_token,
92            bool(search_filter),
93            network_tuple,
94        )
95
96        if search_filter:
97            # We explicitly don't bother caching searches or requests for
98            # appservice specific lists.
99            logger.info("Bypassing cache as search request.")
100
101            return await self._get_public_room_list(
102                limit,
103                since_token,
104                search_filter,
105                network_tuple=network_tuple,
106                from_federation=from_federation,
107            )
108
109        key = (limit, since_token, network_tuple)
110        return await self.response_cache.wrap(
111            key,
112            self._get_public_room_list,
113            limit,
114            since_token,
115            network_tuple=network_tuple,
116            from_federation=from_federation,
117        )
118
119    async def _get_public_room_list(
120        self,
121        limit: Optional[int] = None,
122        since_token: Optional[str] = None,
123        search_filter: Optional[dict] = None,
124        network_tuple: Optional[ThirdPartyInstanceID] = EMPTY_THIRD_PARTY_ID,
125        from_federation: bool = False,
126    ) -> JsonDict:
127        """Generate a public room list.
128        Args:
129            limit: Maximum amount of rooms to return.
130            since_token:
131            search_filter: Dictionary to filter rooms by.
132            network_tuple: Which public list to use.
133                This can be (None, None) to indicate the main list, or a particular
134                appservice and network id to use an appservice specific one.
135                Setting to None returns all public rooms across all lists.
136            from_federation: Whether this request originated from a
137                federating server or a client. Used for room filtering.
138        """
139
140        # Pagination tokens work by storing the room ID sent in the last batch,
141        # plus the direction (forwards or backwards). Next batch tokens always
142        # go forwards, prev batch tokens always go backwards.
143
144        if since_token:
145            batch_token = RoomListNextBatch.from_token(since_token)
146
147            bounds: Optional[Tuple[int, str]] = (
148                batch_token.last_joined_members,
149                batch_token.last_room_id,
150            )
151            forwards = batch_token.direction_is_forward
152            has_batch_token = True
153        else:
154            bounds = None
155
156            forwards = True
157            has_batch_token = False
158
159        # we request one more than wanted to see if there are more pages to come
160        probing_limit = limit + 1 if limit is not None else None
161
162        results = await self.store.get_largest_public_rooms(
163            network_tuple,
164            search_filter,
165            probing_limit,
166            bounds=bounds,
167            forwards=forwards,
168            ignore_non_federatable=from_federation,
169        )
170
171        def build_room_entry(room: JsonDict) -> JsonDict:
172            entry = {
173                "room_id": room["room_id"],
174                "name": room["name"],
175                "topic": room["topic"],
176                "canonical_alias": room["canonical_alias"],
177                "num_joined_members": room["joined_members"],
178                "avatar_url": room["avatar"],
179                "world_readable": room["history_visibility"]
180                == HistoryVisibility.WORLD_READABLE,
181                "guest_can_join": room["guest_access"] == "can_join",
182                "join_rule": room["join_rules"],
183            }
184
185            # Filter out Nones – rather omit the field altogether
186            return {k: v for k, v in entry.items() if v is not None}
187
188        results = [build_room_entry(r) for r in results]
189
190        response: JsonDict = {}
191        num_results = len(results)
192        if limit is not None:
193            more_to_come = num_results == probing_limit
194
195            # Depending on direction we trim either the front or back.
196            if forwards:
197                results = results[:limit]
198            else:
199                results = results[-limit:]
200        else:
201            more_to_come = False
202
203        if num_results > 0:
204            final_entry = results[-1]
205            initial_entry = results[0]
206
207            if forwards:
208                if has_batch_token:
209                    # If there was a token given then we assume that there
210                    # must be previous results.
211                    response["prev_batch"] = RoomListNextBatch(
212                        last_joined_members=initial_entry["num_joined_members"],
213                        last_room_id=initial_entry["room_id"],
214                        direction_is_forward=False,
215                    ).to_token()
216
217                if more_to_come:
218                    response["next_batch"] = RoomListNextBatch(
219                        last_joined_members=final_entry["num_joined_members"],
220                        last_room_id=final_entry["room_id"],
221                        direction_is_forward=True,
222                    ).to_token()
223            else:
224                if has_batch_token:
225                    response["next_batch"] = RoomListNextBatch(
226                        last_joined_members=final_entry["num_joined_members"],
227                        last_room_id=final_entry["room_id"],
228                        direction_is_forward=True,
229                    ).to_token()
230
231                if more_to_come:
232                    response["prev_batch"] = RoomListNextBatch(
233                        last_joined_members=initial_entry["num_joined_members"],
234                        last_room_id=initial_entry["room_id"],
235                        direction_is_forward=False,
236                    ).to_token()
237
238        response["chunk"] = results
239
240        response["total_room_count_estimate"] = await self.store.count_public_rooms(
241            network_tuple, ignore_non_federatable=from_federation
242        )
243
244        return response
245
246    @cached(num_args=1, cache_context=True)
247    async def generate_room_entry(
248        self,
249        room_id: str,
250        num_joined_users: int,
251        cache_context: _CacheContext,
252        with_alias: bool = True,
253        allow_private: bool = False,
254    ) -> Optional[JsonDict]:
255        """Returns the entry for a room
256
257        Args:
258            room_id: The room's ID.
259            num_joined_users: Number of users in the room.
260            cache_context: Information for cached responses.
261            with_alias: Whether to return the room's aliases in the result.
262            allow_private: Whether invite-only rooms should be shown.
263
264        Returns:
265            Returns a room entry as a dictionary, or None if this
266            room was determined not to be shown publicly.
267        """
268        result = {"room_id": room_id, "num_joined_members": num_joined_users}
269
270        if with_alias:
271            aliases = await self.store.get_aliases_for_room(
272                room_id, on_invalidate=cache_context.invalidate
273            )
274            if aliases:
275                result["aliases"] = aliases
276
277        current_state_ids = await self.store.get_current_state_ids(
278            room_id, on_invalidate=cache_context.invalidate
279        )
280
281        if not current_state_ids:
282            # We're not in the room, so may as well bail out here.
283            return result
284
285        event_map = await self.store.get_events(
286            [
287                event_id
288                for key, event_id in current_state_ids.items()
289                if key[0]
290                in (
291                    EventTypes.Create,
292                    EventTypes.JoinRules,
293                    EventTypes.Name,
294                    EventTypes.Topic,
295                    EventTypes.CanonicalAlias,
296                    EventTypes.RoomHistoryVisibility,
297                    EventTypes.GuestAccess,
298                    "m.room.avatar",
299                )
300            ]
301        )
302
303        current_state = {(ev.type, ev.state_key): ev for ev in event_map.values()}
304
305        # Double check that this is actually a public room.
306
307        join_rules_event = current_state.get((EventTypes.JoinRules, ""))
308        if join_rules_event:
309            join_rule = join_rules_event.content.get("join_rule", None)
310            if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
311                return None
312
313        # Return whether this room is open to federation users or not
314        create_event = current_state[EventTypes.Create, ""]
315        result["m.federate"] = create_event.content.get(
316            EventContentFields.FEDERATE, True
317        )
318
319        name_event = current_state.get((EventTypes.Name, ""))
320        if name_event:
321            name = name_event.content.get("name", None)
322            if name:
323                result["name"] = name
324
325        topic_event = current_state.get((EventTypes.Topic, ""))
326        if topic_event:
327            topic = topic_event.content.get("topic", None)
328            if topic:
329                result["topic"] = topic
330
331        canonical_event = current_state.get((EventTypes.CanonicalAlias, ""))
332        if canonical_event:
333            canonical_alias = canonical_event.content.get("alias", None)
334            if canonical_alias:
335                result["canonical_alias"] = canonical_alias
336
337        visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, ""))
338        visibility = None
339        if visibility_event:
340            visibility = visibility_event.content.get("history_visibility", None)
341        result["world_readable"] = visibility == HistoryVisibility.WORLD_READABLE
342
343        guest_event = current_state.get((EventTypes.GuestAccess, ""))
344        guest = None
345        if guest_event:
346            guest = guest_event.content.get(EventContentFields.GUEST_ACCESS)
347        result["guest_can_join"] = guest == GuestAccess.CAN_JOIN
348
349        avatar_event = current_state.get(("m.room.avatar", ""))
350        if avatar_event:
351            avatar_url = avatar_event.content.get("url", None)
352            if avatar_url:
353                result["avatar_url"] = avatar_url
354
355        return result
356
357    async def get_remote_public_room_list(
358        self,
359        server_name: str,
360        limit: Optional[int] = None,
361        since_token: Optional[str] = None,
362        search_filter: Optional[dict] = None,
363        include_all_networks: bool = False,
364        third_party_instance_id: Optional[str] = None,
365    ) -> JsonDict:
366        """Get the public room list from remote server
367
368        Raises:
369            SynapseError
370        """
371
372        if not self.enable_room_list_search:
373            return {"chunk": [], "total_room_count_estimate": 0}
374
375        if search_filter:
376            # Searching across federation is defined in MSC2197.
377            # However, the remote homeserver may or may not actually support it.
378            # So we first try an MSC2197 remote-filtered search, then fall back
379            # to a locally-filtered search if we must.
380
381            try:
382                res = await self._get_remote_list_cached(
383                    server_name,
384                    limit=limit,
385                    since_token=since_token,
386                    include_all_networks=include_all_networks,
387                    third_party_instance_id=third_party_instance_id,
388                    search_filter=search_filter,
389                )
390                return res
391            except HttpResponseException as hre:
392                syn_err = hre.to_synapse_error()
393                if hre.code in (404, 405) or syn_err.errcode in (
394                    Codes.UNRECOGNIZED,
395                    Codes.NOT_FOUND,
396                ):
397                    logger.debug("Falling back to locally-filtered /publicRooms")
398                else:
399                    # Not an error that should trigger a fallback.
400                    raise SynapseError(502, "Failed to fetch room list")
401            except RequestSendFailed:
402                # Not an error that should trigger a fallback.
403                raise SynapseError(502, "Failed to fetch room list")
404
405            # if we reach this point, then we fall back to the situation where
406            # we currently don't support searching across federation, so we have
407            # to do it manually without pagination
408            limit = None
409            since_token = None
410
411        try:
412            res = await self._get_remote_list_cached(
413                server_name,
414                limit=limit,
415                since_token=since_token,
416                include_all_networks=include_all_networks,
417                third_party_instance_id=third_party_instance_id,
418            )
419        except (RequestSendFailed, HttpResponseException):
420            raise SynapseError(502, "Failed to fetch room list")
421
422        if search_filter:
423            res = {
424                "chunk": [
425                    entry
426                    for entry in list(res.get("chunk", []))
427                    if _matches_room_entry(entry, search_filter)
428                ]
429            }
430
431        return res
432
433    async def _get_remote_list_cached(
434        self,
435        server_name: str,
436        limit: Optional[int] = None,
437        since_token: Optional[str] = None,
438        search_filter: Optional[dict] = None,
439        include_all_networks: bool = False,
440        third_party_instance_id: Optional[str] = None,
441    ) -> JsonDict:
442        """Wrapper around FederationClient.get_public_rooms that caches the
443        result.
444        """
445
446        repl_layer = self.hs.get_federation_client()
447        if search_filter:
448            # We can't cache when asking for search
449            return await repl_layer.get_public_rooms(
450                server_name,
451                limit=limit,
452                since_token=since_token,
453                search_filter=search_filter,
454                include_all_networks=include_all_networks,
455                third_party_instance_id=third_party_instance_id,
456            )
457
458        key = (
459            server_name,
460            limit,
461            since_token,
462            include_all_networks,
463            third_party_instance_id,
464        )
465        return await self.remote_response_cache.wrap(
466            key,
467            repl_layer.get_public_rooms,
468            server_name,
469            limit=limit,
470            since_token=since_token,
471            search_filter=search_filter,
472            include_all_networks=include_all_networks,
473            third_party_instance_id=third_party_instance_id,
474        )
475
476
477@attr.s(slots=True, frozen=True, auto_attribs=True)
478class RoomListNextBatch:
479    last_joined_members: int  # The count to get rooms after/before
480    last_room_id: str  # The room_id to get rooms after/before
481    direction_is_forward: bool  # True if this is a next_batch, false if prev_batch
482
483    KEY_DICT = {
484        "last_joined_members": "m",
485        "last_room_id": "r",
486        "direction_is_forward": "d",
487    }
488
489    REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()}
490
491    @classmethod
492    def from_token(cls, token: str) -> "RoomListNextBatch":
493        decoded = msgpack.loads(decode_base64(token), raw=False)
494        return RoomListNextBatch(
495            **{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()}
496        )
497
498    def to_token(self) -> str:
499        return encode_base64(
500            msgpack.dumps(
501                {self.KEY_DICT[key]: val for key, val in attr.asdict(self).items()}
502            )
503        )
504
505    def copy_and_replace(self, **kwds: Any) -> "RoomListNextBatch":
506        return attr.evolve(self, **kwds)
507
508
509def _matches_room_entry(room_entry: JsonDict, search_filter: dict) -> bool:
510    if search_filter and search_filter.get("generic_search_term", None):
511        generic_search_term = search_filter["generic_search_term"].upper()
512        if generic_search_term in room_entry.get("name", "").upper():
513            return True
514        elif generic_search_term in room_entry.get("topic", "").upper():
515            return True
516        elif generic_search_term in room_entry.get("canonical_alias", "").upper():
517            return True
518    else:
519        return True
520
521    return False
522