1# -*- coding: utf-8 -*- 2# 3# Copyright (C) 2015-2021 Edgewall Software 4# All rights reserved. 5# 6# This software is licensed as described in the file COPYING, which 7# you should have received as part of this distribution. The terms 8# are also available at https://trac.edgewall.org/wiki/TracLicense. 9# 10# This software consists of voluntary contributions made by many 11# individuals. For the exact contribution history, see the revision 12# history and logs, available at https://trac.edgewall.org/log/. 13 14import unittest 15from email import message_from_bytes 16 17from trac.core import Component, implements 18from trac.notification.api import ( 19 IEmailAddressResolver, IEmailSender, INotificationFormatter, 20 INotificationSubscriber, NotificationEvent, NotificationSystem, 21) 22from trac.notification.mail import RecipientMatcher 23from trac.notification.model import Subscription 24from trac.test import EnvironmentStub 25from trac.ticket.model import _fixup_cc_list 26from trac.util.datefmt import datetime_now, utc 27from trac.util.html import escape 28from trac.web.session import DetachedSession 29 30 31class TestEmailSender(Component): 32 33 implements(IEmailSender) 34 35 def __init__(self): 36 self.history = [] 37 38 def send(self, from_addr, recipients, message): 39 self.history.append((from_addr, recipients, 40 message_from_bytes(message))) 41 42 43class TestFormatter(Component): 44 45 implements(INotificationFormatter) 46 47 def get_supported_styles(self, transport): 48 if transport == 'email': 49 yield 'text/plain', 'test' 50 yield 'text/html', 'test' 51 52 def format(self, transport, style, event): 53 if transport != 'email': 54 return 55 text = event.target.text 56 if style == 'text/plain': 57 if 'raise-text-plain' in text: 58 raise ValueError() 59 return str(text) 60 if style == 'text/html': 61 if 'raise-text-html' in text: 62 raise ValueError() 63 return '<p>%s</p>' % escape(text) 64 65 66class TestSubscriber(Component): 67 68 implements(INotificationSubscriber) 69 70 def _find_subscriptions(self): 71 klass = self.__class__.__name__ 72 return Subscription.find_by_class(self.env, klass) 73 74 def matches(self, event): 75 if event.realm == 'test': 76 for model in self._find_subscriptions(): 77 yield model.subscription_tuple() 78 79 def description(self): 80 return self.__class__.__name__ 81 82 def requires_authentication(self): 83 return False 84 85 def default_subscriptions(self): 86 return () 87 88 89class TestEmailAddressResolver(Component): 90 91 implements(IEmailAddressResolver) 92 93 def get_address_for_session(self, sid, authenticated): 94 if authenticated == 1: 95 return '%s@example.net' % sid 96 97 98class TestNotificationEvent(NotificationEvent): pass 99 100 101class TestModel(object): 102 103 realm = 'test' 104 105 def __init__(self, text): 106 self.text = text 107 108 109class EmailDistributorTestCase(unittest.TestCase): 110 111 def setUp(self): 112 self.env = EnvironmentStub(enable=['trac.*', TestEmailSender, 113 TestFormatter, TestSubscriber, 114 TestEmailAddressResolver]) 115 self.config = config = self.env.config 116 config.set('notification', 'smtp_from', 'trac@example.org') 117 config.set('notification', 'smtp_enabled', 'enabled') 118 config.set('notification', 'smtp_always_cc', 'cc@example.org') 119 config.set('notification', 'smtp_always_bcc', 'bcc@example.org') 120 config.set('notification', 'email_sender', 'TestEmailSender') 121 config.set('notification', 'email_address_resolvers', 122 'SessionEmailResolver,TestEmailAddressResolver') 123 self.sender = TestEmailSender(self.env) 124 self.notsys = NotificationSystem(self.env) 125 with self.env.db_transaction: 126 self._add_session('foo', email='foo@example.org') 127 self._add_session('bar', email='bar@example.org', 128 name="Bäŕ's name") 129 self._add_session('baz', name='Baz') 130 self._add_session('qux', tz='UTC') 131 self._add_session('corge', email='corge-mail') 132 133 def tearDown(self): 134 self.env.reset_db() 135 136 def _notify_event(self, text, category='created', time=None, author=None): 137 self.sender.history[:] = () 138 event = TestNotificationEvent('test', category, TestModel(text), 139 time or datetime_now(utc), author=author) 140 self.notsys.notify(event) 141 142 def _add_session(self, sid, values=None, **attrs): 143 session = DetachedSession(self.env, sid) 144 if values is not None: 145 attrs.update(values) 146 for name, value in attrs.items(): 147 session[name] = value 148 session.save() 149 150 def _add_subscription(self, **kwargs): 151 subscription = {'sid': None, 'authenticated': 1, 'distributor': 'email', 152 'format': 'text/plain', 'adverb': 'always', 153 'class': 'TestSubscriber'} 154 subscription.update(kwargs) 155 Subscription.add(self.env, subscription) 156 157 def test_smtp_disabled(self): 158 self.env.config.set('notification', 'smtp_enabled', 'disabled') 159 with self.env.db_transaction: 160 self._add_subscription(sid='foo') 161 self._add_subscription(sid='bar') 162 self._notify_event('blah') 163 self.assertEqual([], self.sender.history) 164 165 def _assert_mail(self, message, content_type, body): 166 self.assertNotIn('Bcc', message) 167 self.assertEqual('multipart/related', message.get_content_type()) 168 payload = list(message.get_payload()) 169 self.assertEqual([content_type], 170 [p.get_content_type() for p in payload]) 171 self.assertEqual([body], [p.get_payload().rstrip('\n') 172 for p in payload]) 173 174 def _assert_alternative_mail(self, message, body_plain, body_html): 175 self.assertNotIn('Bcc', message) 176 self.assertEqual('multipart/related', message.get_content_type()) 177 payload = list(message.get_payload()) 178 self.assertEqual(['multipart/alternative'], 179 [p.get_content_type() for p in payload]) 180 alternative = list(payload[0].get_payload()) 181 self.assertEqual(['text/plain', 'text/html'], 182 [p.get_content_type() for p in alternative]) 183 self.assertEqual([body_plain, body_html], 184 [p.get_payload().rstrip('\n') for p in alternative]) 185 186 def test_plain(self): 187 with self.env.db_transaction: 188 self._add_subscription(sid='foo') 189 self._add_subscription(sid='bar') 190 self._add_subscription(sid='baz') 191 self._add_subscription(sid='qux') 192 self._notify_event('blah') 193 194 history = self.sender.history 195 self.assertNotEqual([], history) 196 self.assertEqual(1, len(history)) 197 from_addr, recipients, message = history[0] 198 self.assertEqual('trac@example.org', from_addr) 199 self.assertEqual({'foo@example.org', 'bar@example.org', 200 'baz@example.net', 'qux@example.net', 201 'cc@example.org', 'bcc@example.org'}, 202 set(recipients)) 203 self._assert_mail(message, 'text/plain', 'blah') 204 205 def test_html(self): 206 with self.env.db_transaction: 207 self._add_subscription(sid='foo', format='text/html') 208 self._notify_event('blah') 209 210 history = self.sender.history 211 self.assertNotEqual([], history) 212 self.assertEqual(2, len(history)) 213 for from_addr, recipients, message in history: 214 if 'foo@example.org' in recipients: 215 self.assertEqual('trac@example.org', from_addr) 216 self.assertEqual(['foo@example.org'], recipients) 217 self._assert_alternative_mail(message, 'blah', 218 '<p>blah</p>') 219 if 'cc@example.org' in recipients: 220 self.assertEqual('trac@example.org', from_addr) 221 self.assertEqual({'cc@example.org', 'bcc@example.org'}, 222 set(recipients)) 223 self._assert_mail(message, 'text/plain', 'blah') 224 225 def test_plain_and_html(self): 226 with self.env.db_transaction: 227 self._add_subscription(sid='foo', format='text/plain') 228 self._add_subscription(sid='bar', format='text/html') 229 self._notify_event('blah') 230 231 history = self.sender.history 232 self.assertNotEqual([], history) 233 self.assertEqual(2, len(history)) 234 for from_addr, recipients, message in history: 235 if 'foo@example.org' in recipients: 236 self.assertEqual( 237 {'foo@example.org', 'cc@example.org', 'bcc@example.org'}, 238 set(recipients)) 239 self._assert_mail(message, 'text/plain', 'blah') 240 if 'bar@example.org' in recipients: 241 self.assertEqual(['bar@example.org'], recipients) 242 self._assert_alternative_mail(message, 'blah', 243 '<p>blah</p>') 244 245 def test_formats_in_session_and_tracini(self): 246 self.config.set('notification', 'smtp_always_cc', 'bar,quux') 247 self.config.set('notification', 'smtp_always_bcc', '') 248 self.config.set('notification', 'default_format.email', 'text/html') 249 with self.env.db_transaction: 250 for user in ('foo', 'bar', 'baz', 'qux', 'quux'): 251 self._add_session(user, email='%s@example.org' % user) 252 self._add_subscription(sid='foo', format='text/plain') 253 # bar - no subscriptions 254 self._add_session('bar', 255 {'notification.format.email': 'text/plain'}) 256 self._add_subscription(sid='baz', format='text/plain') 257 self._add_session('baz', 258 {'notification.format.email': 'text/html'}) 259 self._add_subscription(sid='qux', format='text/html') 260 self._add_session('qux', 261 {'notification.format.email': 'text/plain'}) 262 # quux - no subscriptions and no preferred format in session 263 self._notify_event('blah') 264 265 history = self.sender.history 266 self.assertNotEqual([], history) 267 self.assertEqual(2, len(history)) 268 for from_addr, recipients, message in history: 269 self.assertEqual('trac@example.org', from_addr) 270 recipients = sorted(recipients) 271 if 'bar@example.org' in recipients: 272 self.assertEqual(['bar@example.org', 'baz@example.org', 273 'foo@example.org'], recipients) 274 self._assert_mail(message, 'text/plain', 'blah') 275 if 'qux@example.org' in recipients: 276 self.assertEqual(['quux@example.org', 'qux@example.org'], 277 recipients) 278 self._assert_alternative_mail(message, 'blah', 279 '<p>blah</p>') 280 281 def test_broken_plain_formatter(self): 282 with self.env.db_transaction: 283 self._add_subscription(sid='foo', format='text/plain') 284 self._add_subscription(sid='bar', format='text/html') 285 self._notify_event('raise-text-plain') 286 287 history = self.sender.history 288 self.assertNotEqual([], history) 289 self.assertEqual(1, len(history)) 290 from_addr, recipients, message = history[0] 291 self.assertEqual('trac@example.org', from_addr) 292 self.assertEqual(['bar@example.org'], recipients) 293 self._assert_mail(message, 'text/html', '<p>raise-text-plain</p>') 294 295 def test_broken_html_formatter(self): 296 with self.env.db_transaction: 297 self._add_subscription(sid='foo', format='text/html') 298 self._add_subscription(sid='bar', format='text/plain') 299 self._notify_event('raise-text-html') 300 301 # fallback to text/plain 302 history = self.sender.history 303 self.assertNotEqual([], history) 304 self.assertEqual(1, len(history)) 305 from_addr, recipients, message = history[0] 306 self.assertEqual('trac@example.org', from_addr) 307 self.assertEqual({'foo@example.org', 'bar@example.org', 308 'cc@example.org', 'bcc@example.org'}, 309 set(recipients)) 310 self._assert_mail(message, 'text/plain', 'raise-text-html') 311 312 def test_broken_plain_and_html_formatter(self): 313 with self.env.db_transaction: 314 self._add_subscription(sid='foo', format='text/plain') 315 self._add_subscription(sid='bar', format='text/html') 316 self._notify_event('raise-text-plain raise-text-html') 317 318 history = self.sender.history 319 self.assertEqual([], history) 320 321 def test_username_in_always_cc(self): 322 self.env.config.set('notification', 'smtp_always_cc', 323 'foo, cc@example.org') 324 self.env.config.set('notification', 'smtp_always_bcc', 325 'bar, foo, bcc@example.org') 326 self._notify_event('blah') 327 328 history = self.sender.history 329 self.assertNotEqual([], history) 330 self.assertEqual(1, len(history)) 331 from_addr, recipients, message = history[0] 332 self.assertEqual('trac@example.org', from_addr) 333 self.assertEqual({'foo@example.org', 'bar@example.org', 334 'cc@example.org', 'bcc@example.org'}, 335 set(recipients)) 336 self.assertEqual('cc@example.org, foo@example.org', message['Cc']) 337 self.assertIsNone(message['Bcc']) 338 self._assert_mail(message, 'text/plain', 'blah') 339 340 def test_from_author_disabled(self): 341 self.env.config.set('notification', 'smtp_from_author', 'disabled') 342 with self.env.db_transaction: 343 self._add_subscription(sid='bar') 344 345 self._notify_event('blah', author='bar') 346 history = self.sender.history 347 self.assertNotEqual([], history) 348 from_addr, recipients, message = history[0] 349 self.assertEqual('trac@example.org', from_addr) 350 self.assertEqual('My Project <trac@example.org>', message['From']) 351 self.assertEqual(1, len(history)) 352 353 self._notify_event('blah', author=None) 354 history = self.sender.history 355 self.assertNotEqual([], history) 356 from_addr, recipients, message = history[0] 357 self.assertEqual('trac@example.org', from_addr) 358 self.assertEqual('My Project <trac@example.org>', message['From']) 359 self.assertEqual(1, len(history)) 360 361 self.env.config.set('notification', 'smtp_from_name', 'Trac') 362 self._notify_event('blah', author=None) 363 history = self.sender.history 364 self.assertNotEqual([], history) 365 from_addr, recipients, message = history[0] 366 self.assertEqual('trac@example.org', from_addr) 367 self.assertEqual('Trac <trac@example.org>', message['From']) 368 self.assertEqual(1, len(history)) 369 370 def test_from_author_enabled(self): 371 self.env.config.set('notification', 'smtp_from_author', 'enabled') 372 with self.env.db_transaction: 373 self._add_subscription(sid='foo') 374 self._add_subscription(sid='bar') 375 376 self._notify_event('blah', author='bar') 377 history = self.sender.history 378 self.assertNotEqual([], history) 379 from_addr, recipients, message = history[0] 380 self.assertEqual('bar@example.org', from_addr) 381 self.assertEqual('=?utf-8?b?QsOkxZUncw==?= name <bar@example.org>', 382 message['From']) 383 self.assertEqual(1, len(history)) 384 385 self._notify_event('blah', author='foo') 386 history = self.sender.history 387 self.assertNotEqual([], history) 388 from_addr, recipients, message = history[0] 389 self.assertEqual('foo@example.org', from_addr) 390 self.assertEqual('foo@example.org', message['From']) 391 self.assertEqual(1, len(history)) 392 393 self._notify_event('blah', author=None) 394 history = self.sender.history 395 self.assertNotEqual([], history) 396 from_addr, recipients, message = history[0] 397 self.assertEqual('trac@example.org', from_addr) 398 self.assertEqual('My Project <trac@example.org>', message['From']) 399 self.assertEqual(1, len(history)) 400 401 self.env.config.set('notification', 'smtp_from_name', 'Trac') 402 self._notify_event('blah', author=None) 403 history = self.sender.history 404 self.assertNotEqual([], history) 405 from_addr, recipients, message = history[0] 406 self.assertEqual('trac@example.org', from_addr) 407 self.assertEqual('Trac <trac@example.org>', message['From']) 408 self.assertEqual(1, len(history)) 409 410 def test_ignore_domains(self): 411 config = self.env.config 412 config.set('notification', 'smtp_always_cc', 413 'cc@example.org, cc@example.net') 414 config.set('notification', 'smtp_always_bcc', 415 'bcc@example.org, bcc@example.net') 416 config.set('notification', 'ignore_domains', 417 'example.org, example.com') 418 419 with self.env.db_transaction: 420 self._add_subscription(sid='foo') 421 self._add_subscription(sid='bar') 422 self._add_subscription(sid='baz') 423 self._add_subscription(sid='qux') 424 self._notify_event('blah') 425 426 history = self.sender.history 427 self.assertNotEqual([], history) 428 self.assertEqual(1, len(history)) 429 from_addr, recipients, message = history[0] 430 self.assertEqual('trac@example.org', from_addr) 431 self.assertEqual(set(('baz@example.net', 'qux@example.net', 432 'cc@example.net', 'bcc@example.net')), 433 set(recipients)) 434 435 def _test_without_domain(self, use_short_addr='disabled', 436 smtp_default_domain=''): 437 config = self.env.config 438 config.set('notification', 'use_short_addr', use_short_addr) 439 config.set('notification', 'smtp_default_domain', smtp_default_domain) 440 config.set('notification', 'smtp_from', 'from-trac') 441 config.set('notification', 'smtp_always_cc', 'qux, cc@example.org') 442 config.set('notification', 'smtp_always_bcc', 'bcc1@example.org, bcc2') 443 config.set('notification', 'email_address_resolvers', 444 'SessionEmailResolver') 445 with self.env.db_transaction: 446 self._add_subscription(sid='foo') 447 self._add_subscription(sid='baz') 448 self._add_subscription(sid='corge') 449 self._notify_event('blah') 450 history = self.sender.history 451 self.assertNotEqual([], history) 452 self.assertEqual(1, len(history)) 453 return history 454 455 def _assert_equal_sets(self, expected, actual): 456 expected = set(expected) 457 actual = set(actual) 458 if expected != actual: 459 self.fail('%r != %r' % ((expected - actual, actual - expected))) 460 461 def _cclist(self, cc): 462 return _fixup_cc_list(cc).split(', ') 463 464 def test_use_short_addr(self): 465 history = self._test_without_domain(use_short_addr='enabled') 466 from_addr, recipients, message = history[0] 467 self.assertEqual('from-trac', from_addr) 468 self.assertEqual('My Project <from-trac>', message['From']) 469 self._assert_equal_sets(['qux', 'cc@example.org', 'bcc1@example.org', 470 'bcc2', 'foo@example.org', 'baz', 471 'corge-mail'], recipients) 472 self._assert_equal_sets(['qux', 'cc@example.org'], 473 self._cclist(message['Cc'])) 474 475 def test_smtp_default_domain(self): 476 history = self._test_without_domain(smtp_default_domain='example.com') 477 from_addr, recipients, message = history[0] 478 self.assertEqual('from-trac@example.com', from_addr) 479 self.assertEqual('My Project <from-trac@example.com>', 480 message['From']) 481 self._assert_equal_sets(['qux@example.com', 'cc@example.org', 482 'bcc1@example.org', 'bcc2@example.com', 483 'foo@example.org', 'baz@example.com', 484 'corge-mail@example.com'], recipients) 485 self._assert_equal_sets(['qux@example.com', 'cc@example.org'], 486 self._cclist(message['Cc'])) 487 488 def test_username_is_email(self): 489 config = self.env.config 490 config.set('notification', 'email_address_resolvers', 491 'SessionEmailResolver') 492 with self.env.db_transaction: 493 self._add_session(sid='foo@example.com') 494 self._add_session(sid='bar@example.com', 495 email='foo@bar.example.org') 496 self._add_subscription(sid='foo@example.com') 497 self._add_subscription(sid='bar@example.com') 498 self._add_subscription(sid='baz@example.com') # no session 499 self._notify_event('blah') 500 history = self.sender.history 501 self.assertNotEqual([], history) 502 self.assertEqual(1, len(history)) 503 from_addr, recipients, message = history[0] 504 self.assertEqual('trac@example.org', from_addr) 505 self.assertEqual('My Project <trac@example.org>', message['From']) 506 self.assertEqual({'foo@example.com', 'foo@bar.example.org', 507 'baz@example.com', 'cc@example.org', 508 'bcc@example.org'}, set(recipients)) 509 self._assert_equal_sets(['cc@example.org'], 510 self._cclist(message['Cc'])) 511 512 513class RecipientMatcherTestCase(unittest.TestCase): 514 515 def setUp(self): 516 self.env = EnvironmentStub() 517 self.config = self.env.config 518 519 def tearDown(self): 520 self.env.reset_db() 521 522 def _add_session(self, sid, values=None, **attrs): 523 session = DetachedSession(self.env, sid) 524 session['(dummy)'] = 'x' 525 if values is not None: 526 attrs.update(values) 527 for name, value in attrs.items(): 528 session[name] = value 529 session.save() 530 531 def test_match_recipient_empty(self): 532 matcher = RecipientMatcher(self.env) 533 self.assertEqual(None, matcher.match_recipient(None)) 534 self.assertEqual(None, matcher.match_recipient('')) 535 536 def test_match_recipient_anonymous(self): 537 matcher = RecipientMatcher(self.env) 538 self.assertEqual(None, matcher.match_recipient('anonymous')) 539 540 def test_match_recipient_address(self): 541 matcher = RecipientMatcher(self.env) 542 expected = (None, 0, 'user@example.org') 543 self.assertEqual(expected, matcher.match_recipient('user@example.org')) 544 self.assertEqual(expected, 545 matcher.match_recipient('<user@example.org>')) 546 self.assertEqual(expected, matcher.match_recipient( 547 'Name name <user@example.org>')) 548 self.assertEqual(expected, matcher.match_recipient( 549 'Námë ńämé <user@example.org>')) 550 551 def test_match_recipient_admit_domains(self): 552 self.config.set('notification', 'admit_domains', 'LOCALDOMAIN') 553 with self.env.db_transaction: 554 self._add_session('user1', email='user1@localhost') 555 self._add_session('user2', email='user2@localdomain') 556 self._add_session('user3', email='user3@example.org') 557 self._add_session('user4@localhost') 558 self._add_session('user5@localdomain') 559 self._add_session('user6@example.org') 560 self._add_session('user7@localhost', email='user7@example.org') 561 self._add_session('user8@localdomain', email='user8@localhost') 562 self._add_session('user9@example.org', email='user9@localdomain') 563 matcher = RecipientMatcher(self.env) 564 565 # authenticated users 566 self.assertEqual(None, matcher.match_recipient('user1')) 567 self.assertEqual(('user2', 1, 'user2@localdomain'), 568 matcher.match_recipient('user2')) 569 self.assertEqual(('user3', 1, 'user3@example.org'), 570 matcher.match_recipient('user3')) 571 self.assertEqual(None, matcher.match_recipient('user4@localhost')) 572 self.assertEqual(('user5@localdomain', 1, 'user5@localdomain'), 573 matcher.match_recipient('user5@localdomain')) 574 self.assertEqual(('user6@example.org', 1, 'user6@example.org'), 575 matcher.match_recipient('user6@example.org')) 576 self.assertEqual(('user7@localhost', 1, 'user7@example.org'), 577 matcher.match_recipient('user7@localhost')) 578 self.assertEqual(None, matcher.match_recipient('user8@localdomain')) 579 self.assertEqual(('user9@example.org', 1, 'user9@localdomain'), 580 matcher.match_recipient('user9@example.org')) 581 # anonymous users 582 self.assertEqual(None, matcher.match_recipient('foobar')) 583 self.assertEqual(None, matcher.match_recipient('anon@localhost')) 584 self.assertEqual((None, 0, 'anon@localdomain'), 585 matcher.match_recipient('anon@localdomain')) 586 self.assertEqual((None, 0, 'anon@example.org'), 587 matcher.match_recipient('anon@example.org')) 588 589 def test_match_recipient_use_short_addr(self): 590 self.config.set('notification', 'use_short_addr', 'enabled') 591 with self.env.db_transaction: 592 self._add_session('user1') 593 self._add_session('user2', email='user2-email') 594 self._add_session('user3', email='user3@example.org') 595 self._add_session('user4@example.org', email='user4') 596 matcher = RecipientMatcher(self.env) 597 598 self.assertEqual(('user1', 1, 'user1'), 599 matcher.match_recipient('user1')) 600 self.assertEqual(('user2', 1, 'user2-email'), 601 matcher.match_recipient('user2')) 602 self.assertEqual(('user3', 1, 'user3@example.org'), 603 matcher.match_recipient('user3')) 604 self.assertEqual(('user4@example.org', 1, 'user4'), 605 matcher.match_recipient('user4@example.org')) 606 self.assertEqual((None, 0, 'user9'), matcher.match_recipient('user9')) 607 608 def test_match_recipient_smtp_default_domain(self): 609 self.config.set('notification', 'smtp_default_domain', 610 'default.example.net') 611 with self.env.db_transaction: 612 self._add_session('user1') 613 self._add_session('user2', email='user2-email') 614 self._add_session('user3', email='user3@example.org') 615 self._add_session('user4@example.org', email='user4') 616 self._add_session('user5@example.org') 617 matcher = RecipientMatcher(self.env) 618 619 self.assertEqual(('user1', 1, 'user1@default.example.net'), 620 matcher.match_recipient('user1')) 621 self.assertEqual(('user2', 1, 'user2-email@default.example.net'), 622 matcher.match_recipient('user2')) 623 self.assertEqual(('user3', 1, 'user3@example.org'), 624 matcher.match_recipient('user3')) 625 self.assertEqual(('user4@example.org', 1, 'user4@default.example.net'), 626 matcher.match_recipient('user4@example.org')) 627 self.assertEqual(('user5@example.org', 1, 'user5@example.org'), 628 matcher.match_recipient('user5@example.org')) 629 self.assertEqual((None, 0, 'user9@default.example.net'), 630 matcher.match_recipient('user9')) 631 632 def test_match_recipient_ignore_domains(self): 633 self.config.set('notification', 'ignore_domains', 634 'example.net,example.com') 635 with self.env.db_transaction: 636 self._add_session('user1', email='user1@example.org') 637 self._add_session('user2', email='user2@example.com') 638 self._add_session('user3', email='user3@EXAMPLE.COM') 639 self._add_session('user4@example.org') 640 self._add_session('user5@example.com') 641 self._add_session('user6@EXAMPLE.COM') 642 matcher = RecipientMatcher(self.env) 643 644 # authenticated users 645 self.assertEqual(('user1', 1, 'user1@example.org'), 646 matcher.match_recipient('user1')) 647 self.assertEqual(None, matcher.match_recipient('user2')) 648 self.assertEqual(None, matcher.match_recipient('user3')) 649 self.assertEqual(('user4@example.org', 1, 'user4@example.org'), 650 matcher.match_recipient('user4@example.org')) 651 self.assertEqual(None, matcher.match_recipient('user5@example.com')) 652 self.assertEqual(None, matcher.match_recipient('user6@EXAMPLE.COM')) 653 # anonymous users 654 self.assertEqual((None, 0, 'anon@example.org'), 655 matcher.match_recipient('anon@example.org')) 656 self.assertEqual(None, matcher.match_recipient('anon@example.com')) 657 self.assertEqual(None, matcher.match_recipient('anon@EXAMPLE.COM')) 658 659 660def test_suite(): 661 suite = unittest.TestSuite() 662 suite.addTest(unittest.makeSuite(EmailDistributorTestCase)) 663 suite.addTest(unittest.makeSuite(RecipientMatcherTestCase)) 664 return suite 665 666 667if __name__ == '__main__': 668 unittest.main(defaultTest='test_suite') 669