1# Copyright 2014-2016 OpenMarket Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import random
17from typing import TYPE_CHECKING, Iterable, List, Optional
18
19from synapse.api.constants import EduTypes, EventTypes, Membership
20from synapse.api.errors import AuthError, SynapseError
21from synapse.events import EventBase
22from synapse.handlers.presence import format_user_presence_state
23from synapse.logging.utils import log_function
24from synapse.streams.config import PaginationConfig
25from synapse.types import JsonDict, UserID
26from synapse.visibility import filter_events_for_client
27
28if TYPE_CHECKING:
29    from synapse.server import HomeServer
30
31
32logger = logging.getLogger(__name__)
33
34
35class EventStreamHandler:
36    def __init__(self, hs: "HomeServer"):
37        self.store = hs.get_datastore()
38        self.clock = hs.get_clock()
39        self.hs = hs
40
41        self.notifier = hs.get_notifier()
42        self.state = hs.get_state_handler()
43        self._server_notices_sender = hs.get_server_notices_sender()
44        self._event_serializer = hs.get_event_client_serializer()
45
46    @log_function
47    async def get_stream(
48        self,
49        auth_user_id: str,
50        pagin_config: PaginationConfig,
51        timeout: int = 0,
52        as_client_event: bool = True,
53        affect_presence: bool = True,
54        room_id: Optional[str] = None,
55        is_guest: bool = False,
56    ) -> JsonDict:
57        """Fetches the events stream for a given user."""
58
59        if room_id:
60            blocked = await self.store.is_room_blocked(room_id)
61            if blocked:
62                raise SynapseError(403, "This room has been blocked on this server")
63
64        # send any outstanding server notices to the user.
65        await self._server_notices_sender.on_user_syncing(auth_user_id)
66
67        auth_user = UserID.from_string(auth_user_id)
68        presence_handler = self.hs.get_presence_handler()
69
70        context = await presence_handler.user_syncing(
71            auth_user_id, affect_presence=affect_presence
72        )
73        with context:
74            if timeout:
75                # If they've set a timeout set a minimum limit.
76                timeout = max(timeout, 500)
77
78                # Add some randomness to this value to try and mitigate against
79                # thundering herds on restart.
80                timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
81
82            stream_result = await self.notifier.get_events_for(
83                auth_user,
84                pagin_config,
85                timeout,
86                is_guest=is_guest,
87                explicit_room_id=room_id,
88            )
89            events = stream_result.events
90
91            time_now = self.clock.time_msec()
92
93            # When the user joins a new room, or another user joins a currently
94            # joined room, we need to send down presence for those users.
95            to_add: List[JsonDict] = []
96            for event in events:
97                if not isinstance(event, EventBase):
98                    continue
99                if event.type == EventTypes.Member:
100                    if event.membership != Membership.JOIN:
101                        continue
102                    # Send down presence.
103                    if event.state_key == auth_user_id:
104                        # Send down presence for everyone in the room.
105                        users: Iterable[str] = await self.store.get_users_in_room(
106                            event.room_id
107                        )
108                    else:
109                        users = [event.state_key]
110
111                    states = await presence_handler.get_states(users)
112                    to_add.extend(
113                        {
114                            "type": EduTypes.Presence,
115                            "content": format_user_presence_state(state, time_now),
116                        }
117                        for state in states
118                    )
119
120            events.extend(to_add)
121
122            chunks = await self._event_serializer.serialize_events(
123                events,
124                time_now,
125                as_client_event=as_client_event,
126            )
127
128            chunk = {
129                "chunk": chunks,
130                "start": await stream_result.start_token.to_string(self.store),
131                "end": await stream_result.end_token.to_string(self.store),
132            }
133
134            return chunk
135
136
137class EventHandler:
138    def __init__(self, hs: "HomeServer"):
139        self.store = hs.get_datastore()
140        self.storage = hs.get_storage()
141
142    async def get_event(
143        self, user: UserID, room_id: Optional[str], event_id: str
144    ) -> Optional[EventBase]:
145        """Retrieve a single specified event.
146
147        Args:
148            user: The user requesting the event
149            room_id: The expected room id. We'll return None if the
150                event's room does not match.
151            event_id: The event ID to obtain.
152        Returns:
153            An event, or None if there is no event matching this ID.
154        Raises:
155            SynapseError if there was a problem retrieving this event, or
156            AuthError if the user does not have the rights to inspect this
157            event.
158        """
159        event = await self.store.get_event(event_id, check_room_id=room_id)
160
161        if not event:
162            return None
163
164        users = await self.store.get_users_in_room(event.room_id)
165        is_peeking = user.to_string() not in users
166
167        filtered = await filter_events_for_client(
168            self.storage, user.to_string(), [event], is_peeking=is_peeking
169        )
170
171        if not filtered:
172            raise AuthError(403, "You don't have permission to access that event.")
173
174        return event
175