1# Copyright 2018 New Vector 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import email.message 15import os 16from typing import Dict, List, Sequence, Tuple 17 18import attr 19import pkg_resources 20 21from twisted.internet.defer import Deferred 22 23import synapse.rest.admin 24from synapse.api.errors import Codes, SynapseError 25from synapse.rest.client import login, room 26 27from tests.unittest import HomeserverTestCase 28 29 30@attr.s 31class _User: 32 "Helper wrapper for user ID and access token" 33 id = attr.ib() 34 token = attr.ib() 35 36 37class EmailPusherTests(HomeserverTestCase): 38 39 servlets = [ 40 synapse.rest.admin.register_servlets_for_client_rest_resource, 41 room.register_servlets, 42 login.register_servlets, 43 ] 44 user_id = True 45 hijack_auth = False 46 47 def make_homeserver(self, reactor, clock): 48 49 config = self.default_config() 50 config["email"] = { 51 "enable_notifs": True, 52 "template_dir": os.path.abspath( 53 pkg_resources.resource_filename("synapse", "res/templates") 54 ), 55 "expiry_template_html": "notice_expiry.html", 56 "expiry_template_text": "notice_expiry.txt", 57 "notif_template_html": "notif_mail.html", 58 "notif_template_text": "notif_mail.txt", 59 "smtp_host": "127.0.0.1", 60 "smtp_port": 20, 61 "require_transport_security": False, 62 "smtp_user": None, 63 "smtp_pass": None, 64 "app_name": "Matrix", 65 "notif_from": "test@example.com", 66 "riot_base_url": None, 67 } 68 config["public_baseurl"] = "http://aaa" 69 config["start_pushers"] = True 70 71 hs = self.setup_test_homeserver(config=config) 72 73 # List[Tuple[Deferred, args, kwargs]] 74 self.email_attempts: List[Tuple[Deferred, Sequence, Dict]] = [] 75 76 def sendmail(*args, **kwargs): 77 # This mocks out synapse.reactor.send_email._sendmail. 78 d = Deferred() 79 self.email_attempts.append((d, args, kwargs)) 80 return d 81 82 hs.get_send_email_handler()._sendmail = sendmail 83 84 return hs 85 86 def prepare(self, reactor, clock, hs): 87 # Register the user who gets notified 88 self.user_id = self.register_user("user", "pass") 89 self.access_token = self.login("user", "pass") 90 91 # Register other users 92 self.others = [ 93 _User( 94 id=self.register_user("otheruser1", "pass"), 95 token=self.login("otheruser1", "pass"), 96 ), 97 _User( 98 id=self.register_user("otheruser2", "pass"), 99 token=self.login("otheruser2", "pass"), 100 ), 101 ] 102 103 # Register the pusher 104 user_tuple = self.get_success( 105 self.hs.get_datastore().get_user_by_access_token(self.access_token) 106 ) 107 self.token_id = user_tuple.token_id 108 109 # We need to add email to account before we can create a pusher. 110 self.get_success( 111 hs.get_datastore().user_add_threepid( 112 self.user_id, "email", "a@example.com", 0, 0 113 ) 114 ) 115 116 self.pusher = self.get_success( 117 self.hs.get_pusherpool().add_pusher( 118 user_id=self.user_id, 119 access_token=self.token_id, 120 kind="email", 121 app_id="m.email", 122 app_display_name="Email Notifications", 123 device_display_name="a@example.com", 124 pushkey="a@example.com", 125 lang=None, 126 data={}, 127 ) 128 ) 129 130 self.auth_handler = hs.get_auth_handler() 131 self.store = hs.get_datastore() 132 133 def test_need_validated_email(self): 134 """Test that we can only add an email pusher if the user has validated 135 their email. 136 """ 137 with self.assertRaises(SynapseError) as cm: 138 self.get_success_or_raise( 139 self.hs.get_pusherpool().add_pusher( 140 user_id=self.user_id, 141 access_token=self.token_id, 142 kind="email", 143 app_id="m.email", 144 app_display_name="Email Notifications", 145 device_display_name="b@example.com", 146 pushkey="b@example.com", 147 lang=None, 148 data={}, 149 ) 150 ) 151 152 self.assertEqual(400, cm.exception.code) 153 self.assertEqual(Codes.THREEPID_NOT_FOUND, cm.exception.errcode) 154 155 def test_simple_sends_email(self): 156 # Create a simple room with two users 157 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 158 self.helper.invite( 159 room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id 160 ) 161 self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) 162 163 # The other user sends a single message. 164 self.helper.send(room, body="Hi!", tok=self.others[0].token) 165 166 # We should get emailed about that message 167 self._check_for_mail() 168 169 # The other user sends multiple messages. 170 self.helper.send(room, body="Hi!", tok=self.others[0].token) 171 self.helper.send(room, body="There!", tok=self.others[0].token) 172 173 self._check_for_mail() 174 175 def test_invite_sends_email(self): 176 # Create a room and invite the user to it 177 room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token) 178 self.helper.invite( 179 room=room, 180 src=self.others[0].id, 181 tok=self.others[0].token, 182 targ=self.user_id, 183 ) 184 185 # We should get emailed about the invite 186 self._check_for_mail() 187 188 def test_invite_to_empty_room_sends_email(self): 189 # Create a room and invite the user to it 190 room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token) 191 self.helper.invite( 192 room=room, 193 src=self.others[0].id, 194 tok=self.others[0].token, 195 targ=self.user_id, 196 ) 197 198 # Then have the original user leave 199 self.helper.leave(room, self.others[0].id, tok=self.others[0].token) 200 201 # We should get emailed about the invite 202 self._check_for_mail() 203 204 def test_multiple_members_email(self): 205 # We want to test multiple notifications, so we pause processing of push 206 # while we send messages. 207 self.pusher._pause_processing() 208 209 # Create a simple room with multiple other users 210 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 211 212 for other in self.others: 213 self.helper.invite( 214 room=room, src=self.user_id, tok=self.access_token, targ=other.id 215 ) 216 self.helper.join(room=room, user=other.id, tok=other.token) 217 218 # The other users send some messages 219 self.helper.send(room, body="Hi!", tok=self.others[0].token) 220 self.helper.send(room, body="There!", tok=self.others[1].token) 221 self.helper.send(room, body="There!", tok=self.others[1].token) 222 223 # Nothing should have happened yet, as we're paused. 224 assert not self.email_attempts 225 226 self.pusher._resume_processing() 227 228 # We should get emailed about those messages 229 self._check_for_mail() 230 231 def test_multiple_rooms(self): 232 # We want to test multiple notifications from multiple rooms, so we pause 233 # processing of push while we send messages. 234 self.pusher._pause_processing() 235 236 # Create a simple room with multiple other users 237 rooms = [ 238 self.helper.create_room_as(self.user_id, tok=self.access_token), 239 self.helper.create_room_as(self.user_id, tok=self.access_token), 240 ] 241 242 for r, other in zip(rooms, self.others): 243 self.helper.invite( 244 room=r, src=self.user_id, tok=self.access_token, targ=other.id 245 ) 246 self.helper.join(room=r, user=other.id, tok=other.token) 247 248 # The other users send some messages 249 self.helper.send(rooms[0], body="Hi!", tok=self.others[0].token) 250 self.helper.send(rooms[1], body="There!", tok=self.others[1].token) 251 self.helper.send(rooms[1], body="There!", tok=self.others[1].token) 252 253 # Nothing should have happened yet, as we're paused. 254 assert not self.email_attempts 255 256 self.pusher._resume_processing() 257 258 # We should get emailed about those messages 259 self._check_for_mail() 260 261 def test_room_notifications_include_avatar(self): 262 # Create a room and set its avatar. 263 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 264 self.helper.send_state( 265 room, "m.room.avatar", {"url": "mxc://DUMMY_MEDIA_ID"}, self.access_token 266 ) 267 268 # Invite two other uses. 269 for other in self.others: 270 self.helper.invite( 271 room=room, src=self.user_id, tok=self.access_token, targ=other.id 272 ) 273 self.helper.join(room=room, user=other.id, tok=other.token) 274 275 # The other users send some messages. 276 # TODO It seems that two messages are required to trigger an email? 277 self.helper.send(room, body="Alpha", tok=self.others[0].token) 278 self.helper.send(room, body="Beta", tok=self.others[1].token) 279 280 # We should get emailed about those messages 281 args, kwargs = self._check_for_mail() 282 283 # That email should contain the room's avatar 284 msg: bytes = args[5] 285 # Multipart: plain text, base 64 encoded; html, base 64 encoded 286 html = ( 287 email.message_from_bytes(msg) 288 .get_payload()[1] 289 .get_payload(decode=True) 290 .decode() 291 ) 292 self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html) 293 294 def test_empty_room(self): 295 """All users leaving a room shouldn't cause the pusher to break.""" 296 # Create a simple room with two users 297 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 298 self.helper.invite( 299 room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id 300 ) 301 self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) 302 303 # The other user sends a single message. 304 self.helper.send(room, body="Hi!", tok=self.others[0].token) 305 306 # Leave the room before the message is processed. 307 self.helper.leave(room, self.user_id, tok=self.access_token) 308 self.helper.leave(room, self.others[0].id, tok=self.others[0].token) 309 310 # We should get emailed about that message 311 self._check_for_mail() 312 313 def test_empty_room_multiple_messages(self): 314 """All users leaving a room shouldn't cause the pusher to break.""" 315 # Create a simple room with two users 316 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 317 self.helper.invite( 318 room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id 319 ) 320 self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) 321 322 # The other user sends a single message. 323 self.helper.send(room, body="Hi!", tok=self.others[0].token) 324 self.helper.send(room, body="There!", tok=self.others[0].token) 325 326 # Leave the room before the message is processed. 327 self.helper.leave(room, self.user_id, tok=self.access_token) 328 self.helper.leave(room, self.others[0].id, tok=self.others[0].token) 329 330 # We should get emailed about that message 331 self._check_for_mail() 332 333 def test_encrypted_message(self): 334 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 335 self.helper.invite( 336 room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id 337 ) 338 self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) 339 340 # The other user sends some messages 341 self.helper.send_event(room, "m.room.encrypted", {}, tok=self.others[0].token) 342 343 # We should get emailed about that message 344 self._check_for_mail() 345 346 def test_no_email_sent_after_removed(self): 347 # Create a simple room with two users 348 room = self.helper.create_room_as(self.user_id, tok=self.access_token) 349 self.helper.invite( 350 room=room, 351 src=self.user_id, 352 tok=self.access_token, 353 targ=self.others[0].id, 354 ) 355 self.helper.join( 356 room=room, 357 user=self.others[0].id, 358 tok=self.others[0].token, 359 ) 360 361 # The other user sends a single message. 362 self.helper.send(room, body="Hi!", tok=self.others[0].token) 363 364 # We should get emailed about that message 365 self._check_for_mail() 366 367 # disassociate the user's email address 368 self.get_success( 369 self.auth_handler.delete_threepid( 370 user_id=self.user_id, 371 medium="email", 372 address="a@example.com", 373 ) 374 ) 375 376 # check that the pusher for that email address has been deleted 377 pushers = self.get_success( 378 self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) 379 ) 380 pushers = list(pushers) 381 self.assertEqual(len(pushers), 0) 382 383 def test_remove_unlinked_pushers_background_job(self): 384 """Checks that all existing pushers associated with unlinked email addresses are removed 385 upon running the remove_deleted_email_pushers background update. 386 """ 387 # disassociate the user's email address manually (without deleting the pusher). 388 # This resembles the old behaviour, which the background update below is intended 389 # to clean up. 390 self.get_success( 391 self.hs.get_datastore().user_delete_threepid( 392 self.user_id, "email", "a@example.com" 393 ) 394 ) 395 396 # Run the "remove_deleted_email_pushers" background job 397 self.get_success( 398 self.hs.get_datastore().db_pool.simple_insert( 399 table="background_updates", 400 values={ 401 "update_name": "remove_deleted_email_pushers", 402 "progress_json": "{}", 403 "depends_on": None, 404 }, 405 ) 406 ) 407 408 # ... and tell the DataStore that it hasn't finished all updates yet 409 self.hs.get_datastore().db_pool.updates._all_done = False 410 411 # Now let's actually drive the updates to completion 412 self.wait_for_background_updates() 413 414 # Check that all pushers with unlinked addresses were deleted 415 pushers = self.get_success( 416 self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) 417 ) 418 pushers = list(pushers) 419 self.assertEqual(len(pushers), 0) 420 421 def _check_for_mail(self) -> Tuple[Sequence, Dict]: 422 """ 423 Assert that synapse sent off exactly one email notification. 424 425 Returns: 426 args and kwargs passed to synapse.reactor.send_email._sendmail for 427 that notification. 428 """ 429 # Get the stream ordering before it gets sent 430 pushers = self.get_success( 431 self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) 432 ) 433 pushers = list(pushers) 434 self.assertEqual(len(pushers), 1) 435 last_stream_ordering = pushers[0].last_stream_ordering 436 437 # Advance time a bit, so the pusher will register something has happened 438 self.pump(10) 439 440 # It hasn't succeeded yet, so the stream ordering shouldn't have moved 441 pushers = self.get_success( 442 self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) 443 ) 444 pushers = list(pushers) 445 self.assertEqual(len(pushers), 1) 446 self.assertEqual(last_stream_ordering, pushers[0].last_stream_ordering) 447 448 # One email was attempted to be sent 449 self.assertEqual(len(self.email_attempts), 1) 450 451 deferred, sendmail_args, sendmail_kwargs = self.email_attempts[0] 452 # Make the email succeed 453 deferred.callback(True) 454 self.pump() 455 456 # One email was attempted to be sent 457 self.assertEqual(len(self.email_attempts), 1) 458 459 # The stream ordering has increased 460 pushers = self.get_success( 461 self.hs.get_datastore().get_pushers_by({"user_name": self.user_id}) 462 ) 463 pushers = list(pushers) 464 self.assertEqual(len(pushers), 1) 465 self.assertTrue(pushers[0].last_stream_ordering > last_stream_ordering) 466 467 # Reset the attempts. 468 self.email_attempts = [] 469 return sendmail_args, sendmail_kwargs 470