1# Copyright 2019 New Vector Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14from unittest.mock import Mock 15 16from synapse.api.constants import EventTypes 17from synapse.rest import admin 18from synapse.rest.client import login, room 19from synapse.visibility import filter_events_for_client 20 21from tests import unittest 22 23one_hour_ms = 3600000 24one_day_ms = one_hour_ms * 24 25 26 27class RetentionTestCase(unittest.HomeserverTestCase): 28 servlets = [ 29 admin.register_servlets, 30 login.register_servlets, 31 room.register_servlets, 32 ] 33 34 def make_homeserver(self, reactor, clock): 35 config = self.default_config() 36 config["retention"] = { 37 "enabled": True, 38 "default_policy": { 39 "min_lifetime": one_day_ms, 40 "max_lifetime": one_day_ms * 3, 41 }, 42 "allowed_lifetime_min": one_day_ms, 43 "allowed_lifetime_max": one_day_ms * 3, 44 } 45 46 self.hs = self.setup_test_homeserver(config=config) 47 48 return self.hs 49 50 def prepare(self, reactor, clock, homeserver): 51 self.user_id = self.register_user("user", "password") 52 self.token = self.login("user", "password") 53 54 self.store = self.hs.get_datastore() 55 self.serializer = self.hs.get_event_client_serializer() 56 self.clock = self.hs.get_clock() 57 58 def test_retention_event_purged_with_state_event(self): 59 """Tests that expired events are correctly purged when the room's retention policy 60 is defined by a state event. 61 """ 62 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 63 64 # Set the room's retention period to 2 days. 65 lifetime = one_day_ms * 2 66 self.helper.send_state( 67 room_id=room_id, 68 event_type=EventTypes.Retention, 69 body={"max_lifetime": lifetime}, 70 tok=self.token, 71 ) 72 73 self._test_retention_event_purged(room_id, one_day_ms * 1.5) 74 75 def test_retention_event_purged_with_state_event_outside_allowed(self): 76 """Tests that the server configuration can override the policy for a room when 77 running the purge jobs. 78 """ 79 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 80 81 # Set a max_lifetime higher than the maximum allowed value. 82 self.helper.send_state( 83 room_id=room_id, 84 event_type=EventTypes.Retention, 85 body={"max_lifetime": one_day_ms * 4}, 86 tok=self.token, 87 ) 88 89 # Check that the event is purged after waiting for the maximum allowed duration 90 # instead of the one specified in the room's policy. 91 self._test_retention_event_purged(room_id, one_day_ms * 1.5) 92 93 # Set a max_lifetime lower than the minimum allowed value. 94 self.helper.send_state( 95 room_id=room_id, 96 event_type=EventTypes.Retention, 97 body={"max_lifetime": one_hour_ms}, 98 tok=self.token, 99 ) 100 101 # Check that the event is purged after waiting for the minimum allowed duration 102 # instead of the one specified in the room's policy. 103 self._test_retention_event_purged(room_id, one_day_ms * 0.5) 104 105 def test_retention_event_purged_without_state_event(self): 106 """Tests that expired events are correctly purged when the room's retention policy 107 is defined by the server's configuration's default retention policy. 108 """ 109 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 110 111 self._test_retention_event_purged(room_id, one_day_ms * 2) 112 113 def test_visibility(self): 114 """Tests that synapse.visibility.filter_events_for_client correctly filters out 115 outdated events 116 """ 117 store = self.hs.get_datastore() 118 storage = self.hs.get_storage() 119 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 120 events = [] 121 122 # Send a first event, which should be filtered out at the end of the test. 123 resp = self.helper.send(room_id=room_id, body="1", tok=self.token) 124 125 # Get the event from the store so that we end up with a FrozenEvent that we can 126 # give to filter_events_for_client. We need to do this now because the event won't 127 # be in the database anymore after it has expired. 128 events.append(self.get_success(store.get_event(resp.get("event_id")))) 129 130 # Advance the time by 2 days. We're using the default retention policy, therefore 131 # after this the first event will still be valid. 132 self.reactor.advance(one_day_ms * 2 / 1000) 133 134 # Send another event, which shouldn't get filtered out. 135 resp = self.helper.send(room_id=room_id, body="2", tok=self.token) 136 137 valid_event_id = resp.get("event_id") 138 139 events.append(self.get_success(store.get_event(valid_event_id))) 140 141 # Advance the time by another 2 days. After this, the first event should be 142 # outdated but not the second one. 143 self.reactor.advance(one_day_ms * 2 / 1000) 144 145 # Run filter_events_for_client with our list of FrozenEvents. 146 filtered_events = self.get_success( 147 filter_events_for_client(storage, self.user_id, events) 148 ) 149 150 # We should only get one event back. 151 self.assertEqual(len(filtered_events), 1, filtered_events) 152 # That event should be the second, not outdated event. 153 self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events) 154 155 def _test_retention_event_purged(self, room_id: str, increment: float): 156 """Run the following test scenario to test the message retention policy support: 157 158 1. Send event 1 159 2. Increment time by `increment` 160 3. Send event 2 161 4. Increment time by `increment` 162 5. Check that event 1 has been purged 163 6. Check that event 2 has not been purged 164 7. Check that state events that were sent before event 1 aren't purged. 165 The main reason for sending a second event is because currently Synapse won't 166 purge the latest message in a room because it would otherwise result in a lack of 167 forward extremities for this room. It's also a good thing to ensure the purge jobs 168 aren't too greedy and purge messages they shouldn't. 169 170 Args: 171 room_id: The ID of the room to test retention in. 172 increment: The number of milliseconds to advance the clock each time. Must be 173 defined so that events in the room aren't purged if they are `increment` 174 old but are purged if they are `increment * 2` old. 175 """ 176 # Get the create event to, later, check that we can still access it. 177 message_handler = self.hs.get_message_handler() 178 create_event = self.get_success( 179 message_handler.get_room_data( 180 self.user_id, room_id, EventTypes.Create, state_key="" 181 ) 182 ) 183 184 # Send a first event to the room. This is the event we'll want to be purged at the 185 # end of the test. 186 resp = self.helper.send(room_id=room_id, body="1", tok=self.token) 187 188 expired_event_id = resp.get("event_id") 189 190 # Check that we can retrieve the event. 191 expired_event = self.get_event(expired_event_id) 192 self.assertEqual( 193 expired_event.get("content", {}).get("body"), "1", expired_event 194 ) 195 196 # Advance the time. 197 self.reactor.advance(increment / 1000) 198 199 # Send another event. We need this because the purge job won't purge the most 200 # recent event in the room. 201 resp = self.helper.send(room_id=room_id, body="2", tok=self.token) 202 203 valid_event_id = resp.get("event_id") 204 205 # Advance the time again. Now our first event should have expired but our second 206 # one should still be kept. 207 self.reactor.advance(increment / 1000) 208 209 # Check that the first event has been purged from the database, i.e. that we 210 # can't retrieve it anymore, because it has expired. 211 self.get_event(expired_event_id, expect_none=True) 212 213 # Check that the event that hasn't expired can still be retrieved. 214 valid_event = self.get_event(valid_event_id) 215 self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event) 216 217 # Check that we can still access state events that were sent before the event that 218 # has been purged. 219 self.get_event(room_id, create_event.event_id) 220 221 def get_event(self, event_id, expect_none=False): 222 event = self.get_success(self.store.get_event(event_id, allow_none=True)) 223 224 if expect_none: 225 self.assertIsNone(event) 226 return {} 227 228 self.assertIsNotNone(event) 229 230 time_now = self.clock.time_msec() 231 serialized = self.get_success(self.serializer.serialize_event(event, time_now)) 232 233 return serialized 234 235 236class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): 237 servlets = [ 238 admin.register_servlets, 239 login.register_servlets, 240 room.register_servlets, 241 ] 242 243 def make_homeserver(self, reactor, clock): 244 config = self.default_config() 245 config["retention"] = { 246 "enabled": True, 247 } 248 249 mock_federation_client = Mock(spec=["backfill"]) 250 251 self.hs = self.setup_test_homeserver( 252 config=config, 253 federation_client=mock_federation_client, 254 ) 255 return self.hs 256 257 def prepare(self, reactor, clock, homeserver): 258 self.user_id = self.register_user("user", "password") 259 self.token = self.login("user", "password") 260 261 def test_no_default_policy(self): 262 """Tests that an event doesn't get expired if there is neither a default retention 263 policy nor a policy specific to the room. 264 """ 265 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 266 267 self._test_retention(room_id) 268 269 def test_state_policy(self): 270 """Tests that an event gets correctly expired if there is no default retention 271 policy but there's a policy specific to the room. 272 """ 273 room_id = self.helper.create_room_as(self.user_id, tok=self.token) 274 275 # Set the maximum lifetime to 35 days so that the first event gets expired but not 276 # the second one. 277 self.helper.send_state( 278 room_id=room_id, 279 event_type=EventTypes.Retention, 280 body={"max_lifetime": one_day_ms * 35}, 281 tok=self.token, 282 ) 283 284 self._test_retention(room_id, expected_code_for_first_event=404) 285 286 def _test_retention(self, room_id, expected_code_for_first_event=200): 287 # Send a first event to the room. This is the event we'll want to be purged at the 288 # end of the test. 289 resp = self.helper.send(room_id=room_id, body="1", tok=self.token) 290 291 first_event_id = resp.get("event_id") 292 293 # Check that we can retrieve the event. 294 expired_event = self.get_event(room_id, first_event_id) 295 self.assertEqual( 296 expired_event.get("content", {}).get("body"), "1", expired_event 297 ) 298 299 # Advance the time by a month. 300 self.reactor.advance(one_day_ms * 30 / 1000) 301 302 # Send another event. We need this because the purge job won't purge the most 303 # recent event in the room. 304 resp = self.helper.send(room_id=room_id, body="2", tok=self.token) 305 306 second_event_id = resp.get("event_id") 307 308 # Advance the time by another month. 309 self.reactor.advance(one_day_ms * 30 / 1000) 310 311 # Check if the event has been purged from the database. 312 first_event = self.get_event( 313 room_id, first_event_id, expected_code=expected_code_for_first_event 314 ) 315 316 if expected_code_for_first_event == 200: 317 self.assertEqual( 318 first_event.get("content", {}).get("body"), "1", first_event 319 ) 320 321 # Check that the event that hasn't been purged can still be retrieved. 322 second_event = self.get_event(room_id, second_event_id) 323 self.assertEqual(second_event.get("content", {}).get("body"), "2", second_event) 324 325 def get_event(self, room_id, event_id, expected_code=200): 326 url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id) 327 328 channel = self.make_request("GET", url, access_token=self.token) 329 330 self.assertEqual(channel.code, expected_code, channel.result) 331 332 return channel.json_body 333