1# Copyright 2019 New Vector Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14from unittest.mock import Mock
15
16from synapse.api.constants import EventTypes
17from synapse.rest import admin
18from synapse.rest.client import login, room
19from synapse.visibility import filter_events_for_client
20
21from tests import unittest
22
23one_hour_ms = 3600000
24one_day_ms = one_hour_ms * 24
25
26
27class RetentionTestCase(unittest.HomeserverTestCase):
28    servlets = [
29        admin.register_servlets,
30        login.register_servlets,
31        room.register_servlets,
32    ]
33
34    def make_homeserver(self, reactor, clock):
35        config = self.default_config()
36        config["retention"] = {
37            "enabled": True,
38            "default_policy": {
39                "min_lifetime": one_day_ms,
40                "max_lifetime": one_day_ms * 3,
41            },
42            "allowed_lifetime_min": one_day_ms,
43            "allowed_lifetime_max": one_day_ms * 3,
44        }
45
46        self.hs = self.setup_test_homeserver(config=config)
47
48        return self.hs
49
50    def prepare(self, reactor, clock, homeserver):
51        self.user_id = self.register_user("user", "password")
52        self.token = self.login("user", "password")
53
54        self.store = self.hs.get_datastore()
55        self.serializer = self.hs.get_event_client_serializer()
56        self.clock = self.hs.get_clock()
57
58    def test_retention_event_purged_with_state_event(self):
59        """Tests that expired events are correctly purged when the room's retention policy
60        is defined by a state event.
61        """
62        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
63
64        # Set the room's retention period to 2 days.
65        lifetime = one_day_ms * 2
66        self.helper.send_state(
67            room_id=room_id,
68            event_type=EventTypes.Retention,
69            body={"max_lifetime": lifetime},
70            tok=self.token,
71        )
72
73        self._test_retention_event_purged(room_id, one_day_ms * 1.5)
74
75    def test_retention_event_purged_with_state_event_outside_allowed(self):
76        """Tests that the server configuration can override the policy for a room when
77        running the purge jobs.
78        """
79        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
80
81        # Set a max_lifetime higher than the maximum allowed value.
82        self.helper.send_state(
83            room_id=room_id,
84            event_type=EventTypes.Retention,
85            body={"max_lifetime": one_day_ms * 4},
86            tok=self.token,
87        )
88
89        # Check that the event is purged after waiting for the maximum allowed duration
90        # instead of the one specified in the room's policy.
91        self._test_retention_event_purged(room_id, one_day_ms * 1.5)
92
93        # Set a max_lifetime lower than the minimum allowed value.
94        self.helper.send_state(
95            room_id=room_id,
96            event_type=EventTypes.Retention,
97            body={"max_lifetime": one_hour_ms},
98            tok=self.token,
99        )
100
101        # Check that the event is purged after waiting for the minimum allowed duration
102        # instead of the one specified in the room's policy.
103        self._test_retention_event_purged(room_id, one_day_ms * 0.5)
104
105    def test_retention_event_purged_without_state_event(self):
106        """Tests that expired events are correctly purged when the room's retention policy
107        is defined by the server's configuration's default retention policy.
108        """
109        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
110
111        self._test_retention_event_purged(room_id, one_day_ms * 2)
112
113    def test_visibility(self):
114        """Tests that synapse.visibility.filter_events_for_client correctly filters out
115        outdated events
116        """
117        store = self.hs.get_datastore()
118        storage = self.hs.get_storage()
119        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
120        events = []
121
122        # Send a first event, which should be filtered out at the end of the test.
123        resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
124
125        # Get the event from the store so that we end up with a FrozenEvent that we can
126        # give to filter_events_for_client. We need to do this now because the event won't
127        # be in the database anymore after it has expired.
128        events.append(self.get_success(store.get_event(resp.get("event_id"))))
129
130        # Advance the time by 2 days. We're using the default retention policy, therefore
131        # after this the first event will still be valid.
132        self.reactor.advance(one_day_ms * 2 / 1000)
133
134        # Send another event, which shouldn't get filtered out.
135        resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
136
137        valid_event_id = resp.get("event_id")
138
139        events.append(self.get_success(store.get_event(valid_event_id)))
140
141        # Advance the time by another 2 days. After this, the first event should be
142        # outdated but not the second one.
143        self.reactor.advance(one_day_ms * 2 / 1000)
144
145        # Run filter_events_for_client with our list of FrozenEvents.
146        filtered_events = self.get_success(
147            filter_events_for_client(storage, self.user_id, events)
148        )
149
150        # We should only get one event back.
151        self.assertEqual(len(filtered_events), 1, filtered_events)
152        # That event should be the second, not outdated event.
153        self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events)
154
155    def _test_retention_event_purged(self, room_id: str, increment: float):
156        """Run the following test scenario to test the message retention policy support:
157
158        1. Send event 1
159        2. Increment time by `increment`
160        3. Send event 2
161        4. Increment time by `increment`
162        5. Check that event 1 has been purged
163        6. Check that event 2 has not been purged
164        7. Check that state events that were sent before event 1 aren't purged.
165        The main reason for sending a second event is because currently Synapse won't
166        purge the latest message in a room because it would otherwise result in a lack of
167        forward extremities for this room. It's also a good thing to ensure the purge jobs
168        aren't too greedy and purge messages they shouldn't.
169
170        Args:
171            room_id: The ID of the room to test retention in.
172            increment: The number of milliseconds to advance the clock each time. Must be
173                defined so that events in the room aren't purged if they are `increment`
174                old but are purged if they are `increment * 2` old.
175        """
176        # Get the create event to, later, check that we can still access it.
177        message_handler = self.hs.get_message_handler()
178        create_event = self.get_success(
179            message_handler.get_room_data(
180                self.user_id, room_id, EventTypes.Create, state_key=""
181            )
182        )
183
184        # Send a first event to the room. This is the event we'll want to be purged at the
185        # end of the test.
186        resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
187
188        expired_event_id = resp.get("event_id")
189
190        # Check that we can retrieve the event.
191        expired_event = self.get_event(expired_event_id)
192        self.assertEqual(
193            expired_event.get("content", {}).get("body"), "1", expired_event
194        )
195
196        # Advance the time.
197        self.reactor.advance(increment / 1000)
198
199        # Send another event. We need this because the purge job won't purge the most
200        # recent event in the room.
201        resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
202
203        valid_event_id = resp.get("event_id")
204
205        # Advance the time again. Now our first event should have expired but our second
206        # one should still be kept.
207        self.reactor.advance(increment / 1000)
208
209        # Check that the first event has been purged from the database, i.e. that we
210        # can't retrieve it anymore, because it has expired.
211        self.get_event(expired_event_id, expect_none=True)
212
213        # Check that the event that hasn't expired can still be retrieved.
214        valid_event = self.get_event(valid_event_id)
215        self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event)
216
217        # Check that we can still access state events that were sent before the event that
218        # has been purged.
219        self.get_event(room_id, create_event.event_id)
220
221    def get_event(self, event_id, expect_none=False):
222        event = self.get_success(self.store.get_event(event_id, allow_none=True))
223
224        if expect_none:
225            self.assertIsNone(event)
226            return {}
227
228        self.assertIsNotNone(event)
229
230        time_now = self.clock.time_msec()
231        serialized = self.get_success(self.serializer.serialize_event(event, time_now))
232
233        return serialized
234
235
236class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
237    servlets = [
238        admin.register_servlets,
239        login.register_servlets,
240        room.register_servlets,
241    ]
242
243    def make_homeserver(self, reactor, clock):
244        config = self.default_config()
245        config["retention"] = {
246            "enabled": True,
247        }
248
249        mock_federation_client = Mock(spec=["backfill"])
250
251        self.hs = self.setup_test_homeserver(
252            config=config,
253            federation_client=mock_federation_client,
254        )
255        return self.hs
256
257    def prepare(self, reactor, clock, homeserver):
258        self.user_id = self.register_user("user", "password")
259        self.token = self.login("user", "password")
260
261    def test_no_default_policy(self):
262        """Tests that an event doesn't get expired if there is neither a default retention
263        policy nor a policy specific to the room.
264        """
265        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
266
267        self._test_retention(room_id)
268
269    def test_state_policy(self):
270        """Tests that an event gets correctly expired if there is no default retention
271        policy but there's a policy specific to the room.
272        """
273        room_id = self.helper.create_room_as(self.user_id, tok=self.token)
274
275        # Set the maximum lifetime to 35 days so that the first event gets expired but not
276        # the second one.
277        self.helper.send_state(
278            room_id=room_id,
279            event_type=EventTypes.Retention,
280            body={"max_lifetime": one_day_ms * 35},
281            tok=self.token,
282        )
283
284        self._test_retention(room_id, expected_code_for_first_event=404)
285
286    def _test_retention(self, room_id, expected_code_for_first_event=200):
287        # Send a first event to the room. This is the event we'll want to be purged at the
288        # end of the test.
289        resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
290
291        first_event_id = resp.get("event_id")
292
293        # Check that we can retrieve the event.
294        expired_event = self.get_event(room_id, first_event_id)
295        self.assertEqual(
296            expired_event.get("content", {}).get("body"), "1", expired_event
297        )
298
299        # Advance the time by a month.
300        self.reactor.advance(one_day_ms * 30 / 1000)
301
302        # Send another event. We need this because the purge job won't purge the most
303        # recent event in the room.
304        resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
305
306        second_event_id = resp.get("event_id")
307
308        # Advance the time by another month.
309        self.reactor.advance(one_day_ms * 30 / 1000)
310
311        # Check if the event has been purged from the database.
312        first_event = self.get_event(
313            room_id, first_event_id, expected_code=expected_code_for_first_event
314        )
315
316        if expected_code_for_first_event == 200:
317            self.assertEqual(
318                first_event.get("content", {}).get("body"), "1", first_event
319            )
320
321        # Check that the event that hasn't been purged can still be retrieved.
322        second_event = self.get_event(room_id, second_event_id)
323        self.assertEqual(second_event.get("content", {}).get("body"), "2", second_event)
324
325    def get_event(self, room_id, event_id, expected_code=200):
326        url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
327
328        channel = self.make_request("GET", url, access_token=self.token)
329
330        self.assertEqual(channel.code, expected_code, channel.result)
331
332        return channel.json_body
333