1# Copyright 2018 New Vector
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import email.message
15import os
16from typing import Dict, List, Sequence, Tuple
17
18import attr
19import pkg_resources
20
21from twisted.internet.defer import Deferred
22
23import synapse.rest.admin
24from synapse.api.errors import Codes, SynapseError
25from synapse.rest.client import login, room
26
27from tests.unittest import HomeserverTestCase
28
29
30@attr.s
31class _User:
32    "Helper wrapper for user ID and access token"
33    id = attr.ib()
34    token = attr.ib()
35
36
37class EmailPusherTests(HomeserverTestCase):
38
39    servlets = [
40        synapse.rest.admin.register_servlets_for_client_rest_resource,
41        room.register_servlets,
42        login.register_servlets,
43    ]
44    user_id = True
45    hijack_auth = False
46
47    def make_homeserver(self, reactor, clock):
48
49        config = self.default_config()
50        config["email"] = {
51            "enable_notifs": True,
52            "template_dir": os.path.abspath(
53                pkg_resources.resource_filename("synapse", "res/templates")
54            ),
55            "expiry_template_html": "notice_expiry.html",
56            "expiry_template_text": "notice_expiry.txt",
57            "notif_template_html": "notif_mail.html",
58            "notif_template_text": "notif_mail.txt",
59            "smtp_host": "127.0.0.1",
60            "smtp_port": 20,
61            "require_transport_security": False,
62            "smtp_user": None,
63            "smtp_pass": None,
64            "app_name": "Matrix",
65            "notif_from": "test@example.com",
66            "riot_base_url": None,
67        }
68        config["public_baseurl"] = "http://aaa"
69        config["start_pushers"] = True
70
71        hs = self.setup_test_homeserver(config=config)
72
73        # List[Tuple[Deferred, args, kwargs]]
74        self.email_attempts: List[Tuple[Deferred, Sequence, Dict]] = []
75
76        def sendmail(*args, **kwargs):
77            # This mocks out synapse.reactor.send_email._sendmail.
78            d = Deferred()
79            self.email_attempts.append((d, args, kwargs))
80            return d
81
82        hs.get_send_email_handler()._sendmail = sendmail
83
84        return hs
85
86    def prepare(self, reactor, clock, hs):
87        # Register the user who gets notified
88        self.user_id = self.register_user("user", "pass")
89        self.access_token = self.login("user", "pass")
90
91        # Register other users
92        self.others = [
93            _User(
94                id=self.register_user("otheruser1", "pass"),
95                token=self.login("otheruser1", "pass"),
96            ),
97            _User(
98                id=self.register_user("otheruser2", "pass"),
99                token=self.login("otheruser2", "pass"),
100            ),
101        ]
102
103        # Register the pusher
104        user_tuple = self.get_success(
105            self.hs.get_datastore().get_user_by_access_token(self.access_token)
106        )
107        self.token_id = user_tuple.token_id
108
109        # We need to add email to account before we can create a pusher.
110        self.get_success(
111            hs.get_datastore().user_add_threepid(
112                self.user_id, "email", "a@example.com", 0, 0
113            )
114        )
115
116        self.pusher = self.get_success(
117            self.hs.get_pusherpool().add_pusher(
118                user_id=self.user_id,
119                access_token=self.token_id,
120                kind="email",
121                app_id="m.email",
122                app_display_name="Email Notifications",
123                device_display_name="a@example.com",
124                pushkey="a@example.com",
125                lang=None,
126                data={},
127            )
128        )
129
130        self.auth_handler = hs.get_auth_handler()
131        self.store = hs.get_datastore()
132
133    def test_need_validated_email(self):
134        """Test that we can only add an email pusher if the user has validated
135        their email.
136        """
137        with self.assertRaises(SynapseError) as cm:
138            self.get_success_or_raise(
139                self.hs.get_pusherpool().add_pusher(
140                    user_id=self.user_id,
141                    access_token=self.token_id,
142                    kind="email",
143                    app_id="m.email",
144                    app_display_name="Email Notifications",
145                    device_display_name="b@example.com",
146                    pushkey="b@example.com",
147                    lang=None,
148                    data={},
149                )
150            )
151
152        self.assertEqual(400, cm.exception.code)
153        self.assertEqual(Codes.THREEPID_NOT_FOUND, cm.exception.errcode)
154
155    def test_simple_sends_email(self):
156        # Create a simple room with two users
157        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
158        self.helper.invite(
159            room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
160        )
161        self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
162
163        # The other user sends a single message.
164        self.helper.send(room, body="Hi!", tok=self.others[0].token)
165
166        # We should get emailed about that message
167        self._check_for_mail()
168
169        # The other user sends multiple messages.
170        self.helper.send(room, body="Hi!", tok=self.others[0].token)
171        self.helper.send(room, body="There!", tok=self.others[0].token)
172
173        self._check_for_mail()
174
175    def test_invite_sends_email(self):
176        # Create a room and invite the user to it
177        room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
178        self.helper.invite(
179            room=room,
180            src=self.others[0].id,
181            tok=self.others[0].token,
182            targ=self.user_id,
183        )
184
185        # We should get emailed about the invite
186        self._check_for_mail()
187
188    def test_invite_to_empty_room_sends_email(self):
189        # Create a room and invite the user to it
190        room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
191        self.helper.invite(
192            room=room,
193            src=self.others[0].id,
194            tok=self.others[0].token,
195            targ=self.user_id,
196        )
197
198        # Then have the original user leave
199        self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
200
201        # We should get emailed about the invite
202        self._check_for_mail()
203
204    def test_multiple_members_email(self):
205        # We want to test multiple notifications, so we pause processing of push
206        # while we send messages.
207        self.pusher._pause_processing()
208
209        # Create a simple room with multiple other users
210        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
211
212        for other in self.others:
213            self.helper.invite(
214                room=room, src=self.user_id, tok=self.access_token, targ=other.id
215            )
216            self.helper.join(room=room, user=other.id, tok=other.token)
217
218        # The other users send some messages
219        self.helper.send(room, body="Hi!", tok=self.others[0].token)
220        self.helper.send(room, body="There!", tok=self.others[1].token)
221        self.helper.send(room, body="There!", tok=self.others[1].token)
222
223        # Nothing should have happened yet, as we're paused.
224        assert not self.email_attempts
225
226        self.pusher._resume_processing()
227
228        # We should get emailed about those messages
229        self._check_for_mail()
230
231    def test_multiple_rooms(self):
232        # We want to test multiple notifications from multiple rooms, so we pause
233        # processing of push while we send messages.
234        self.pusher._pause_processing()
235
236        # Create a simple room with multiple other users
237        rooms = [
238            self.helper.create_room_as(self.user_id, tok=self.access_token),
239            self.helper.create_room_as(self.user_id, tok=self.access_token),
240        ]
241
242        for r, other in zip(rooms, self.others):
243            self.helper.invite(
244                room=r, src=self.user_id, tok=self.access_token, targ=other.id
245            )
246            self.helper.join(room=r, user=other.id, tok=other.token)
247
248        # The other users send some messages
249        self.helper.send(rooms[0], body="Hi!", tok=self.others[0].token)
250        self.helper.send(rooms[1], body="There!", tok=self.others[1].token)
251        self.helper.send(rooms[1], body="There!", tok=self.others[1].token)
252
253        # Nothing should have happened yet, as we're paused.
254        assert not self.email_attempts
255
256        self.pusher._resume_processing()
257
258        # We should get emailed about those messages
259        self._check_for_mail()
260
261    def test_room_notifications_include_avatar(self):
262        # Create a room and set its avatar.
263        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
264        self.helper.send_state(
265            room, "m.room.avatar", {"url": "mxc://DUMMY_MEDIA_ID"}, self.access_token
266        )
267
268        # Invite two other uses.
269        for other in self.others:
270            self.helper.invite(
271                room=room, src=self.user_id, tok=self.access_token, targ=other.id
272            )
273            self.helper.join(room=room, user=other.id, tok=other.token)
274
275        # The other users send some messages.
276        # TODO It seems that two messages are required to trigger an email?
277        self.helper.send(room, body="Alpha", tok=self.others[0].token)
278        self.helper.send(room, body="Beta", tok=self.others[1].token)
279
280        # We should get emailed about those messages
281        args, kwargs = self._check_for_mail()
282
283        # That email should contain the room's avatar
284        msg: bytes = args[5]
285        # Multipart: plain text, base 64 encoded; html, base 64 encoded
286        html = (
287            email.message_from_bytes(msg)
288            .get_payload()[1]
289            .get_payload(decode=True)
290            .decode()
291        )
292        self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html)
293
294    def test_empty_room(self):
295        """All users leaving a room shouldn't cause the pusher to break."""
296        # Create a simple room with two users
297        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
298        self.helper.invite(
299            room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
300        )
301        self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
302
303        # The other user sends a single message.
304        self.helper.send(room, body="Hi!", tok=self.others[0].token)
305
306        # Leave the room before the message is processed.
307        self.helper.leave(room, self.user_id, tok=self.access_token)
308        self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
309
310        # We should get emailed about that message
311        self._check_for_mail()
312
313    def test_empty_room_multiple_messages(self):
314        """All users leaving a room shouldn't cause the pusher to break."""
315        # Create a simple room with two users
316        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
317        self.helper.invite(
318            room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
319        )
320        self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
321
322        # The other user sends a single message.
323        self.helper.send(room, body="Hi!", tok=self.others[0].token)
324        self.helper.send(room, body="There!", tok=self.others[0].token)
325
326        # Leave the room before the message is processed.
327        self.helper.leave(room, self.user_id, tok=self.access_token)
328        self.helper.leave(room, self.others[0].id, tok=self.others[0].token)
329
330        # We should get emailed about that message
331        self._check_for_mail()
332
333    def test_encrypted_message(self):
334        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
335        self.helper.invite(
336            room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
337        )
338        self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token)
339
340        # The other user sends some messages
341        self.helper.send_event(room, "m.room.encrypted", {}, tok=self.others[0].token)
342
343        # We should get emailed about that message
344        self._check_for_mail()
345
346    def test_no_email_sent_after_removed(self):
347        # Create a simple room with two users
348        room = self.helper.create_room_as(self.user_id, tok=self.access_token)
349        self.helper.invite(
350            room=room,
351            src=self.user_id,
352            tok=self.access_token,
353            targ=self.others[0].id,
354        )
355        self.helper.join(
356            room=room,
357            user=self.others[0].id,
358            tok=self.others[0].token,
359        )
360
361        # The other user sends a single message.
362        self.helper.send(room, body="Hi!", tok=self.others[0].token)
363
364        # We should get emailed about that message
365        self._check_for_mail()
366
367        # disassociate the user's email address
368        self.get_success(
369            self.auth_handler.delete_threepid(
370                user_id=self.user_id,
371                medium="email",
372                address="a@example.com",
373            )
374        )
375
376        # check that the pusher for that email address has been deleted
377        pushers = self.get_success(
378            self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
379        )
380        pushers = list(pushers)
381        self.assertEqual(len(pushers), 0)
382
383    def test_remove_unlinked_pushers_background_job(self):
384        """Checks that all existing pushers associated with unlinked email addresses are removed
385        upon running the remove_deleted_email_pushers background update.
386        """
387        # disassociate the user's email address manually (without deleting the pusher).
388        # This resembles the old behaviour, which the background update below is intended
389        # to clean up.
390        self.get_success(
391            self.hs.get_datastore().user_delete_threepid(
392                self.user_id, "email", "a@example.com"
393            )
394        )
395
396        # Run the "remove_deleted_email_pushers" background job
397        self.get_success(
398            self.hs.get_datastore().db_pool.simple_insert(
399                table="background_updates",
400                values={
401                    "update_name": "remove_deleted_email_pushers",
402                    "progress_json": "{}",
403                    "depends_on": None,
404                },
405            )
406        )
407
408        # ... and tell the DataStore that it hasn't finished all updates yet
409        self.hs.get_datastore().db_pool.updates._all_done = False
410
411        # Now let's actually drive the updates to completion
412        self.wait_for_background_updates()
413
414        # Check that all pushers with unlinked addresses were deleted
415        pushers = self.get_success(
416            self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
417        )
418        pushers = list(pushers)
419        self.assertEqual(len(pushers), 0)
420
421    def _check_for_mail(self) -> Tuple[Sequence, Dict]:
422        """
423        Assert that synapse sent off exactly one email notification.
424
425        Returns:
426            args and kwargs passed to synapse.reactor.send_email._sendmail for
427            that notification.
428        """
429        # Get the stream ordering before it gets sent
430        pushers = self.get_success(
431            self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
432        )
433        pushers = list(pushers)
434        self.assertEqual(len(pushers), 1)
435        last_stream_ordering = pushers[0].last_stream_ordering
436
437        # Advance time a bit, so the pusher will register something has happened
438        self.pump(10)
439
440        # It hasn't succeeded yet, so the stream ordering shouldn't have moved
441        pushers = self.get_success(
442            self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
443        )
444        pushers = list(pushers)
445        self.assertEqual(len(pushers), 1)
446        self.assertEqual(last_stream_ordering, pushers[0].last_stream_ordering)
447
448        # One email was attempted to be sent
449        self.assertEqual(len(self.email_attempts), 1)
450
451        deferred, sendmail_args, sendmail_kwargs = self.email_attempts[0]
452        # Make the email succeed
453        deferred.callback(True)
454        self.pump()
455
456        # One email was attempted to be sent
457        self.assertEqual(len(self.email_attempts), 1)
458
459        # The stream ordering has increased
460        pushers = self.get_success(
461            self.hs.get_datastore().get_pushers_by({"user_name": self.user_id})
462        )
463        pushers = list(pushers)
464        self.assertEqual(len(pushers), 1)
465        self.assertTrue(pushers[0].last_stream_ordering > last_stream_ordering)
466
467        # Reset the attempts.
468        self.email_attempts = []
469        return sendmail_args, sendmail_kwargs
470