1# Copyright 2014-2016 OpenMarket Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import random 17from typing import TYPE_CHECKING, Iterable, List, Optional 18 19from synapse.api.constants import EduTypes, EventTypes, Membership 20from synapse.api.errors import AuthError, SynapseError 21from synapse.events import EventBase 22from synapse.handlers.presence import format_user_presence_state 23from synapse.logging.utils import log_function 24from synapse.streams.config import PaginationConfig 25from synapse.types import JsonDict, UserID 26from synapse.visibility import filter_events_for_client 27 28if TYPE_CHECKING: 29 from synapse.server import HomeServer 30 31 32logger = logging.getLogger(__name__) 33 34 35class EventStreamHandler: 36 def __init__(self, hs: "HomeServer"): 37 self.store = hs.get_datastore() 38 self.clock = hs.get_clock() 39 self.hs = hs 40 41 self.notifier = hs.get_notifier() 42 self.state = hs.get_state_handler() 43 self._server_notices_sender = hs.get_server_notices_sender() 44 self._event_serializer = hs.get_event_client_serializer() 45 46 @log_function 47 async def get_stream( 48 self, 49 auth_user_id: str, 50 pagin_config: PaginationConfig, 51 timeout: int = 0, 52 as_client_event: bool = True, 53 affect_presence: bool = True, 54 room_id: Optional[str] = None, 55 is_guest: bool = False, 56 ) -> JsonDict: 57 """Fetches the events stream for a given user.""" 58 59 if room_id: 60 blocked = await self.store.is_room_blocked(room_id) 61 if blocked: 62 raise SynapseError(403, "This room has been blocked on this server") 63 64 # send any outstanding server notices to the user. 65 await self._server_notices_sender.on_user_syncing(auth_user_id) 66 67 auth_user = UserID.from_string(auth_user_id) 68 presence_handler = self.hs.get_presence_handler() 69 70 context = await presence_handler.user_syncing( 71 auth_user_id, affect_presence=affect_presence 72 ) 73 with context: 74 if timeout: 75 # If they've set a timeout set a minimum limit. 76 timeout = max(timeout, 500) 77 78 # Add some randomness to this value to try and mitigate against 79 # thundering herds on restart. 80 timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) 81 82 stream_result = await self.notifier.get_events_for( 83 auth_user, 84 pagin_config, 85 timeout, 86 is_guest=is_guest, 87 explicit_room_id=room_id, 88 ) 89 events = stream_result.events 90 91 time_now = self.clock.time_msec() 92 93 # When the user joins a new room, or another user joins a currently 94 # joined room, we need to send down presence for those users. 95 to_add: List[JsonDict] = [] 96 for event in events: 97 if not isinstance(event, EventBase): 98 continue 99 if event.type == EventTypes.Member: 100 if event.membership != Membership.JOIN: 101 continue 102 # Send down presence. 103 if event.state_key == auth_user_id: 104 # Send down presence for everyone in the room. 105 users: Iterable[str] = await self.store.get_users_in_room( 106 event.room_id 107 ) 108 else: 109 users = [event.state_key] 110 111 states = await presence_handler.get_states(users) 112 to_add.extend( 113 { 114 "type": EduTypes.Presence, 115 "content": format_user_presence_state(state, time_now), 116 } 117 for state in states 118 ) 119 120 events.extend(to_add) 121 122 chunks = await self._event_serializer.serialize_events( 123 events, 124 time_now, 125 as_client_event=as_client_event, 126 ) 127 128 chunk = { 129 "chunk": chunks, 130 "start": await stream_result.start_token.to_string(self.store), 131 "end": await stream_result.end_token.to_string(self.store), 132 } 133 134 return chunk 135 136 137class EventHandler: 138 def __init__(self, hs: "HomeServer"): 139 self.store = hs.get_datastore() 140 self.storage = hs.get_storage() 141 142 async def get_event( 143 self, user: UserID, room_id: Optional[str], event_id: str 144 ) -> Optional[EventBase]: 145 """Retrieve a single specified event. 146 147 Args: 148 user: The user requesting the event 149 room_id: The expected room id. We'll return None if the 150 event's room does not match. 151 event_id: The event ID to obtain. 152 Returns: 153 An event, or None if there is no event matching this ID. 154 Raises: 155 SynapseError if there was a problem retrieving this event, or 156 AuthError if the user does not have the rights to inspect this 157 event. 158 """ 159 event = await self.store.get_event(event_id, check_room_id=room_id) 160 161 if not event: 162 return None 163 164 users = await self.store.get_users_in_room(event.room_id) 165 is_peeking = user.to_string() not in users 166 167 filtered = await filter_events_for_client( 168 self.storage, user.to_string(), [event], is_peeking=is_peeking 169 ) 170 171 if not filtered: 172 raise AuthError(403, "You don't have permission to access that event.") 173 174 return event 175