1# Copyright 2015, 2016 OpenMarket Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple 16 17from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes 18from synapse.appservice import ApplicationService 19from synapse.streams import EventSource 20from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id 21 22if TYPE_CHECKING: 23 from synapse.server import HomeServer 24 25logger = logging.getLogger(__name__) 26 27 28class ReceiptsHandler: 29 def __init__(self, hs: "HomeServer"): 30 self.notifier = hs.get_notifier() 31 self.server_name = hs.config.server.server_name 32 self.store = hs.get_datastore() 33 self.event_auth_handler = hs.get_event_auth_handler() 34 35 self.hs = hs 36 37 # We only need to poke the federation sender explicitly if its on the 38 # same instance. Other federation sender instances will get notified by 39 # `synapse.app.generic_worker.FederationSenderHandler` when it sees it 40 # in the receipts stream. 41 self.federation_sender = None 42 if hs.should_send_federation(): 43 self.federation_sender = hs.get_federation_sender() 44 45 # If we can handle the receipt EDUs we do so, otherwise we route them 46 # to the appropriate worker. 47 if hs.get_instance_name() in hs.config.worker.writers.receipts: 48 hs.get_federation_registry().register_edu_handler( 49 "m.receipt", self._received_remote_receipt 50 ) 51 else: 52 hs.get_federation_registry().register_instances_for_edu( 53 "m.receipt", 54 hs.config.worker.writers.receipts, 55 ) 56 57 self.clock = self.hs.get_clock() 58 self.state = hs.get_state_handler() 59 60 async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None: 61 """Called when we receive an EDU of type m.receipt from a remote HS.""" 62 receipts = [] 63 for room_id, room_values in content.items(): 64 # If we're not in the room just ditch the event entirely. This is 65 # probably an old server that has come back and thinks we're still in 66 # the room (or we've been rejoined to the room by a state reset). 67 is_in_room = await self.event_auth_handler.check_host_in_room( 68 room_id, self.server_name 69 ) 70 if not is_in_room: 71 logger.info( 72 "Ignoring receipt for room %r from server %s as we're not in the room", 73 room_id, 74 origin, 75 ) 76 continue 77 78 for receipt_type, users in room_values.items(): 79 for user_id, user_values in users.items(): 80 if get_domain_from_id(user_id) != origin: 81 logger.info( 82 "Received receipt for user %r from server %s, ignoring", 83 user_id, 84 origin, 85 ) 86 continue 87 88 receipts.append( 89 ReadReceipt( 90 room_id=room_id, 91 receipt_type=receipt_type, 92 user_id=user_id, 93 event_ids=user_values["event_ids"], 94 data=user_values.get("data", {}), 95 ) 96 ) 97 98 await self._handle_new_receipts(receipts) 99 100 async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: 101 """Takes a list of receipts, stores them and informs the notifier.""" 102 min_batch_id: Optional[int] = None 103 max_batch_id: Optional[int] = None 104 105 for receipt in receipts: 106 res = await self.store.insert_receipt( 107 receipt.room_id, 108 receipt.receipt_type, 109 receipt.user_id, 110 receipt.event_ids, 111 receipt.data, 112 ) 113 114 if not res: 115 # res will be None if this read receipt is 'old' 116 continue 117 118 stream_id, max_persisted_id = res 119 120 if min_batch_id is None or stream_id < min_batch_id: 121 min_batch_id = stream_id 122 if max_batch_id is None or max_persisted_id > max_batch_id: 123 max_batch_id = max_persisted_id 124 125 # Either both of these should be None or neither. 126 if min_batch_id is None or max_batch_id is None: 127 # no new receipts 128 return False 129 130 affected_room_ids = list({r.room_id for r in receipts}) 131 132 self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids) 133 # Note that the min here shouldn't be relied upon to be accurate. 134 await self.hs.get_pusherpool().on_new_receipts( 135 min_batch_id, max_batch_id, affected_room_ids 136 ) 137 138 return True 139 140 async def received_client_receipt( 141 self, room_id: str, receipt_type: str, user_id: str, event_id: str, hidden: bool 142 ) -> None: 143 """Called when a client tells us a local user has read up to the given 144 event_id in the room. 145 """ 146 receipt = ReadReceipt( 147 room_id=room_id, 148 receipt_type=receipt_type, 149 user_id=user_id, 150 event_ids=[event_id], 151 data={"ts": int(self.clock.time_msec()), "hidden": hidden}, 152 ) 153 154 is_new = await self._handle_new_receipts([receipt]) 155 if not is_new: 156 return 157 158 if self.federation_sender and not ( 159 self.hs.config.experimental.msc2285_enabled and hidden 160 ): 161 await self.federation_sender.send_read_receipt(receipt) 162 163 164class ReceiptEventSource(EventSource[int, JsonDict]): 165 def __init__(self, hs: "HomeServer"): 166 self.store = hs.get_datastore() 167 self.config = hs.config 168 169 @staticmethod 170 def filter_out_hidden(events: List[JsonDict], user_id: str) -> List[JsonDict]: 171 visible_events = [] 172 173 # filter out hidden receipts the user shouldn't see 174 for event in events: 175 content = event.get("content", {}) 176 new_event = event.copy() 177 new_event["content"] = {} 178 179 for event_id in content.keys(): 180 event_content = content.get(event_id, {}) 181 m_read = event_content.get(ReceiptTypes.READ, {}) 182 183 # If m_read is missing copy over the original event_content as there is nothing to process here 184 if not m_read: 185 new_event["content"][event_id] = event_content.copy() 186 continue 187 188 new_users = {} 189 for rr_user_id, user_rr in m_read.items(): 190 try: 191 hidden = user_rr.get("hidden") 192 except AttributeError: 193 # Due to https://github.com/matrix-org/synapse/issues/10376 194 # there are cases where user_rr is a string, in those cases 195 # we just ignore the read receipt 196 continue 197 198 if hidden is not True or rr_user_id == user_id: 199 new_users[rr_user_id] = user_rr.copy() 200 # If hidden has a value replace hidden with the correct prefixed key 201 if hidden is not None: 202 new_users[rr_user_id].pop("hidden") 203 new_users[rr_user_id][ 204 ReadReceiptEventFields.MSC2285_HIDDEN 205 ] = hidden 206 207 # Set new users unless empty 208 if len(new_users.keys()) > 0: 209 new_event["content"][event_id] = {ReceiptTypes.READ: new_users} 210 211 # Append new_event to visible_events unless empty 212 if len(new_event["content"].keys()) > 0: 213 visible_events.append(new_event) 214 215 return visible_events 216 217 async def get_new_events( 218 self, 219 user: UserID, 220 from_key: int, 221 limit: Optional[int], 222 room_ids: Iterable[str], 223 is_guest: bool, 224 explicit_room_id: Optional[str] = None, 225 ) -> Tuple[List[JsonDict], int]: 226 from_key = int(from_key) 227 to_key = self.get_current_key() 228 229 if from_key == to_key: 230 return [], to_key 231 232 events = await self.store.get_linearized_receipts_for_rooms( 233 room_ids, from_key=from_key, to_key=to_key 234 ) 235 236 if self.config.experimental.msc2285_enabled: 237 events = ReceiptEventSource.filter_out_hidden(events, user.to_string()) 238 239 return events, to_key 240 241 async def get_new_events_as( 242 self, from_key: int, service: ApplicationService 243 ) -> Tuple[List[JsonDict], int]: 244 """Returns a set of new read receipt events that an appservice 245 may be interested in. 246 247 Args: 248 from_key: the stream position at which events should be fetched from 249 service: The appservice which may be interested 250 251 Returns: 252 A two-tuple containing the following: 253 * A list of json dictionaries derived from read receipts that the 254 appservice may be interested in. 255 * The current read receipt stream token. 256 """ 257 from_key = int(from_key) 258 to_key = self.get_current_key() 259 260 if from_key == to_key: 261 return [], to_key 262 263 # Fetch all read receipts for all rooms, up to a limit of 100. This is ordered 264 # by most recent. 265 rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms( 266 from_key=from_key, to_key=to_key 267 ) 268 269 # Then filter down to rooms that the AS can read 270 events = [] 271 for room_id, event in rooms_to_events.items(): 272 if not await service.matches_user_in_member_list(room_id, self.store): 273 continue 274 275 events.append(event) 276 277 return events, to_key 278 279 def get_current_key(self, direction: str = "f") -> int: 280 return self.store.get_max_receipt_stream_id() 281