1# Copyright (C) 2001-2018 by the Free Software Foundation, Inc. 2# 3# This program is free software; you can redistribute it and/or 4# modify it under the terms of the GNU General Public License 5# as published by the Free Software Foundation; either version 2 6# of the License, or (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program; if not, write to the Free Software 15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, 16# USA. 17 18"""Unit tests for the various Mailman/Handlers/*.py modules. 19""" 20 21import os 22import time 23import email 24import errno 25import cPickle 26import unittest 27from types import ListType 28from email.Generator import Generator 29try: 30 from Mailman import __init__ 31except ImportError: 32 import paths 33 34from Mailman import mm_cfg 35from Mailman.MailList import MailList 36from Mailman import Message 37from Mailman import Errors 38from Mailman import Pending 39from Mailman.Queue.Switchboard import Switchboard 40 41from Mailman.Handlers import Acknowledge 42from Mailman.Handlers import AfterDelivery 43from Mailman.Handlers import Approve 44from Mailman.Handlers import CalcRecips 45from Mailman.Handlers import Cleanse 46from Mailman.Handlers import CookHeaders 47from Mailman.Handlers import Decorate 48from Mailman.Handlers import FileRecips 49from Mailman.Handlers import Hold 50from Mailman.Handlers import MimeDel 51from Mailman.Handlers import Moderate 52from Mailman.Handlers import Replybot 53# Don't test handlers such as SMTPDirect and Sendmail here 54from Mailman.Handlers import SpamDetect 55from Mailman.Handlers import Tagger 56from Mailman.Handlers import ToArchive 57from Mailman.Handlers import ToDigest 58from Mailman.Handlers import ToOutgoing 59from Mailman.Handlers import ToUsenet 60from Mailman.Utils import sha_new 61 62from TestBase import TestBase 63 64 65 66def password(plaintext): 67 return sha_new(plaintext).hexdigest() 68 69 70 71class TestAcknowledge(TestBase): 72 def setUp(self): 73 TestBase.setUp(self) 74 # We're going to want to inspect this queue directory 75 self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) 76 # Add a member 77 self._mlist.addNewMember('aperson@dom.ain') 78 self._mlist.personalize = False 79 self._mlist.dmarc_moderation_action = 0 80 81 def tearDown(self): 82 for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): 83 os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) 84 TestBase.tearDown(self) 85 86 def test_no_ack_msgdata(self): 87 eq = self.assertEqual 88 # Make sure there are no files in the virgin queue already 89 eq(len(self._sb.files()), 0) 90 msg = email.message_from_string("""\ 91From: aperson@dom.ain 92 93""", Message.Message) 94 Acknowledge.process(self._mlist, msg, 95 {'original_sender': 'aperson@dom.ain'}) 96 eq(len(self._sb.files()), 0) 97 98 def test_no_ack_not_a_member(self): 99 eq = self.assertEqual 100 # Make sure there are no files in the virgin queue already 101 eq(len(self._sb.files()), 0) 102 msg = email.message_from_string("""\ 103From: bperson@dom.ain 104 105""", Message.Message) 106 Acknowledge.process(self._mlist, msg, 107 {'original_sender': 'bperson@dom.ain'}) 108 eq(len(self._sb.files()), 0) 109 110 def test_no_ack_sender(self): 111 eq = self.assertEqual 112 eq(len(self._sb.files()), 0) 113 msg = email.message_from_string("""\ 114From: aperson@dom.ain 115 116""", Message.Message) 117 Acknowledge.process(self._mlist, msg, {}) 118 eq(len(self._sb.files()), 0) 119 120 def test_ack_no_subject(self): 121 eq = self.assertEqual 122 self._mlist.setMemberOption( 123 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1) 124 eq(len(self._sb.files()), 0) 125 msg = email.message_from_string("""\ 126From: aperson@dom.ain 127 128""", Message.Message) 129 Acknowledge.process(self._mlist, msg, {}) 130 files = self._sb.files() 131 eq(len(files), 1) 132 qmsg, qdata = self._sb.dequeue(files[0]) 133 # Check the .db file 134 eq(qdata.get('listname'), '_xtest') 135 eq(qdata.get('recips'), ['aperson@dom.ain']) 136 eq(qdata.get('version'), 3) 137 # Check the .pck 138 eq(str(str(qmsg['subject'])), '_xtest post acknowledgement') 139 eq(qmsg['to'], 'aperson@dom.ain') 140 eq(qmsg['from'], '_xtest-bounces@dom.ain') 141 eq(qmsg.get_content_type(), 'text/plain') 142 eq(qmsg.get_param('charset'), 'us-ascii') 143 msgid = qmsg['message-id'] 144 self.failUnless(msgid.startswith('<mailman.')) 145 self.failUnless(msgid.endswith('._xtest@dom.ain>')) 146 eq(qmsg.get_payload(), """\ 147Your message entitled 148 149 (no subject) 150 151was successfully received by the _xtest mailing list. 152 153List info page: http://www.dom.ain/mailman/listinfo/_xtest 154Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain 155""") 156 # Make sure we dequeued the only message 157 eq(len(self._sb.files()), 0) 158 159 def test_ack_with_subject(self): 160 eq = self.assertEqual 161 self._mlist.setMemberOption( 162 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1) 163 eq(len(self._sb.files()), 0) 164 msg = email.message_from_string("""\ 165From: aperson@dom.ain 166Subject: Wish you were here 167 168""", Message.Message) 169 Acknowledge.process(self._mlist, msg, {}) 170 files = self._sb.files() 171 eq(len(files), 1) 172 qmsg, qdata = self._sb.dequeue(files[0]) 173 # Check the .db file 174 eq(qdata.get('listname'), '_xtest') 175 eq(qdata.get('recips'), ['aperson@dom.ain']) 176 eq(qdata.get('version'), 3) 177 # Check the .pck 178 eq(str(qmsg['subject']), '_xtest post acknowledgement') 179 eq(qmsg['to'], 'aperson@dom.ain') 180 eq(qmsg['from'], '_xtest-bounces@dom.ain') 181 eq(qmsg.get_content_type(), 'text/plain') 182 eq(qmsg.get_param('charset'), 'us-ascii') 183 msgid = qmsg['message-id'] 184 self.failUnless(msgid.startswith('<mailman.')) 185 self.failUnless(msgid.endswith('._xtest@dom.ain>')) 186 eq(qmsg.get_payload(), """\ 187Your message entitled 188 189 Wish you were here 190 191was successfully received by the _xtest mailing list. 192 193List info page: http://www.dom.ain/mailman/listinfo/_xtest 194Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain 195""") 196 # Make sure we dequeued the only message 197 eq(len(self._sb.files()), 0) 198 199 200 201class TestAfterDelivery(TestBase): 202 # Both msg and msgdata are ignored 203 def test_process(self): 204 mlist = self._mlist 205 last_post_time = mlist.last_post_time 206 post_id = mlist.post_id 207 AfterDelivery.process(mlist, None, None) 208 self.failUnless(mlist.last_post_time > last_post_time) 209 self.assertEqual(mlist.post_id, post_id + 1) 210 211 212 213class TestApprove(TestBase): 214 def test_short_circuit(self): 215 msgdata = {'approved': 1} 216 rtn = Approve.process(self._mlist, Message.Message(), msgdata) 217 # Not really a great test, but there's little else to assert 218 self.assertEqual(rtn, None) 219 220 def test_approved_moderator(self): 221 mlist = self._mlist 222 mlist.mod_password = password('wazoo') 223 msg = email.message_from_string("""\ 224Approved: wazoo 225 226""") 227 msgdata = {} 228 Approve.process(mlist, msg, msgdata) 229 self.failUnless(msgdata.has_key('approved')) 230 self.assertEqual(msgdata['approved'], 1) 231 232 def test_approve_moderator(self): 233 mlist = self._mlist 234 mlist.mod_password = password('wazoo') 235 msg = email.message_from_string("""\ 236Approve: wazoo 237 238""") 239 msgdata = {} 240 Approve.process(mlist, msg, msgdata) 241 self.failUnless(msgdata.has_key('approved')) 242 self.assertEqual(msgdata['approved'], 1) 243 244 def test_approved_admin(self): 245 mlist = self._mlist 246 mlist.password = password('wazoo') 247 msg = email.message_from_string("""\ 248Approved: wazoo 249 250""") 251 msgdata = {} 252 Approve.process(mlist, msg, msgdata) 253 self.failUnless(msgdata.has_key('approved')) 254 self.assertEqual(msgdata['approved'], 1) 255 256 def test_approve_admin(self): 257 mlist = self._mlist 258 mlist.password = password('wazoo') 259 msg = email.message_from_string("""\ 260Approve: wazoo 261 262""") 263 msgdata = {} 264 Approve.process(mlist, msg, msgdata) 265 self.failUnless(msgdata.has_key('approved')) 266 self.assertEqual(msgdata['approved'], 1) 267 268 def test_unapproved(self): 269 mlist = self._mlist 270 mlist.password = password('zoowa') 271 msg = email.message_from_string("""\ 272Approve: wazoo 273 274""") 275 msgdata = {} 276 Approve.process(mlist, msg, msgdata) 277 self.assertEqual(msgdata.get('approved'), None) 278 279 def test_trip_beentheres(self): 280 mlist = self._mlist 281 msg = email.message_from_string("""\ 282X-BeenThere: %s 283 284""" % mlist.GetListEmail()) 285 self.assertRaises(Errors.LoopError, Approve.process, mlist, msg, {}) 286 287 288 289class TestCalcRecips(TestBase): 290 def setUp(self): 291 TestBase.setUp(self) 292 # Add a bunch of regular members 293 mlist = self._mlist 294 mlist.addNewMember('aperson@dom.ain') 295 mlist.addNewMember('bperson@dom.ain') 296 mlist.addNewMember('cperson@dom.ain') 297 # And a bunch of digest members 298 mlist.addNewMember('dperson@dom.ain', digest=1) 299 mlist.addNewMember('eperson@dom.ain', digest=1) 300 mlist.addNewMember('fperson@dom.ain', digest=1) 301 302 def test_short_circuit(self): 303 msgdata = {'recips': 1} 304 rtn = CalcRecips.process(self._mlist, None, msgdata) 305 # Not really a great test, but there's little else to assert 306 self.assertEqual(rtn, None) 307 308 def test_simple_path(self): 309 msgdata = {} 310 msg = email.message_from_string("""\ 311From: dperson@dom.ain 312 313""", Message.Message) 314 CalcRecips.process(self._mlist, msg, msgdata) 315 self.failUnless(msgdata.has_key('recips')) 316 recips = msgdata['recips'] 317 recips.sort() 318 self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 319 'cperson@dom.ain']) 320 321 def test_exclude_sender(self): 322 msgdata = {} 323 msg = email.message_from_string("""\ 324From: cperson@dom.ain 325 326""", Message.Message) 327 self._mlist.setMemberOption('cperson@dom.ain', 328 mm_cfg.DontReceiveOwnPosts, 1) 329 CalcRecips.process(self._mlist, msg, msgdata) 330 self.failUnless(msgdata.has_key('recips')) 331 recips = msgdata['recips'] 332 recips.sort() 333 self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain']) 334 335 def test_urgent_moderator(self): 336 self._mlist.mod_password = password('xxXXxx') 337 msgdata = {} 338 msg = email.message_from_string("""\ 339From: dperson@dom.ain 340Urgent: xxXXxx 341 342""", Message.Message) 343 CalcRecips.process(self._mlist, msg, msgdata) 344 self.failUnless(msgdata.has_key('recips')) 345 recips = msgdata['recips'] 346 recips.sort() 347 self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 348 'cperson@dom.ain', 'dperson@dom.ain', 349 'eperson@dom.ain', 'fperson@dom.ain']) 350 351 def test_urgent_admin(self): 352 self._mlist.mod_password = password('yyYYyy') 353 self._mlist.password = password('xxXXxx') 354 msgdata = {} 355 msg = email.message_from_string("""\ 356From: dperson@dom.ain 357Urgent: xxXXxx 358 359""", Message.Message) 360 CalcRecips.process(self._mlist, msg, msgdata) 361 self.failUnless(msgdata.has_key('recips')) 362 recips = msgdata['recips'] 363 recips.sort() 364 self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 365 'cperson@dom.ain', 'dperson@dom.ain', 366 'eperson@dom.ain', 'fperson@dom.ain']) 367 368 def test_urgent_reject(self): 369 self._mlist.mod_password = password('yyYYyy') 370 self._mlist.password = password('xxXXxx') 371 msgdata = {} 372 msg = email.message_from_string("""\ 373From: dperson@dom.ain 374Urgent: zzZZzz 375 376""", Message.Message) 377 self.assertRaises(Errors.RejectMessage, 378 CalcRecips.process, 379 self._mlist, msg, msgdata) 380 381 # BAW: must test the do_topic_filters() path... 382 383 384 385class TestCleanse(TestBase): 386 def setUp(self): 387 TestBase.setUp(self) 388 389 def test_simple_cleanse(self): 390 eq = self.assertEqual 391 msg = email.message_from_string("""\ 392From: aperson@dom.ain 393Approved: yes 394Urgent: indeed 395Reply-To: bperson@dom.ain 396Sender: asystem@dom.ain 397Return-Receipt-To: another@dom.ain 398Disposition-Notification-To: athird@dom.ain 399X-Confirm-Reading-To: afourth@dom.ain 400X-PMRQC: afifth@dom.ain 401Subject: a message to you 402 403""", Message.Message) 404 Cleanse.process(self._mlist, msg, {}) 405 eq(msg['approved'], None) 406 eq(msg['urgent'], None) 407 eq(msg['return-receipt-to'], None) 408 eq(msg['disposition-notification-to'], None) 409 eq(msg['x-confirm-reading-to'], None) 410 eq(msg['x-pmrqc'], None) 411 eq(msg['from'], 'aperson@dom.ain') 412 eq(msg['reply-to'], 'bperson@dom.ain') 413 eq(msg['sender'], 'asystem@dom.ain') 414 eq(msg['subject'], 'a message to you') 415 416 def test_anon_cleanse(self): 417 eq = self.assertEqual 418 msg = email.message_from_string("""\ 419From: aperson@dom.ain 420Approved: yes 421Urgent: indeed 422Reply-To: bperson@dom.ain 423Sender: asystem@dom.ain 424Return-Receipt-To: another@dom.ain 425Disposition-Notification-To: athird@dom.ain 426X-Confirm-Reading-To: afourth@dom.ain 427X-PMRQC: afifth@dom.ain 428Subject: a message to you 429 430""", Message.Message) 431 self._mlist.anonymous_list = 1 432 Cleanse.process(self._mlist, msg, {}) 433 eq(msg['approved'], None) 434 eq(msg['urgent'], None) 435 eq(msg['return-receipt-to'], None) 436 eq(msg['disposition-notification-to'], None) 437 eq(msg['x-confirm-reading-to'], None) 438 eq(msg['x-pmrqc'], None) 439 eq(len(msg.get_all('from')), 1) 440 eq(len(msg.get_all('reply-to')), 1) 441 eq(msg['from'], '_xtest@dom.ain') 442 eq(msg['reply-to'], '_xtest@dom.ain') 443 eq(msg['sender'], None) 444 eq(msg['subject'], 'a message to you') 445 446 447 448class TestCookHeaders(TestBase): 449 def test_transform_noack_to_xack(self): 450 eq = self.assertEqual 451 msg = email.message_from_string("""\ 452X-Ack: yes 453 454""", Message.Message) 455 CookHeaders.process(self._mlist, msg, {'noack': 1}) 456 eq(len(msg.get_all('x-ack')), 1) 457 eq(msg['x-ack'], 'no') 458 459 def test_original_sender(self): 460 msg = email.message_from_string("""\ 461From: aperson@dom.ain 462 463""", Message.Message) 464 msgdata = {} 465 CookHeaders.process(self._mlist, msg, msgdata) 466 self.assertEqual(msgdata.get('original_sender'), 'aperson@dom.ain') 467 468 def test_no_original_sender(self): 469 msg = email.message_from_string("""\ 470Subject: about this message 471 472""", Message.Message) 473 msgdata = {} 474 CookHeaders.process(self._mlist, msg, msgdata) 475 self.assertEqual(msgdata.get('original_sender'), '') 476 477 def test_xbeenthere(self): 478 msg = email.message_from_string("""\ 479From: aperson@dom.ain 480 481""", Message.Message) 482 CookHeaders.process(self._mlist, msg, {}) 483 self.assertEqual(msg['x-beenthere'], '_xtest@dom.ain') 484 485 def test_multiple_xbeentheres(self): 486 eq = self.assertEqual 487 msg = email.message_from_string("""\ 488From: aperson@dom.ain 489X-BeenThere: alist@another.dom.ain 490 491""", Message.Message) 492 CookHeaders.process(self._mlist, msg, {}) 493 eq(len(msg.get_all('x-beenthere')), 2) 494 beentheres = msg.get_all('x-beenthere') 495 beentheres.sort() 496 eq(beentheres, ['_xtest@dom.ain', 'alist@another.dom.ain']) 497 498 def test_nonexisting_mmversion(self): 499 eq = self.assertEqual 500 msg = email.message_from_string("""\ 501From: aperson@dom.ain 502 503""", Message.Message) 504 CookHeaders.process(self._mlist, msg, {}) 505 eq(msg['x-mailman-version'], mm_cfg.VERSION) 506 507 def test_existing_mmversion(self): 508 eq = self.assertEqual 509 msg = email.message_from_string("""\ 510From: aperson@dom.ain 511X-Mailman-Version: 3000 512 513""", Message.Message) 514 CookHeaders.process(self._mlist, msg, {}) 515 eq(len(msg.get_all('x-mailman-version')), 1) 516 eq(msg['x-mailman-version'], '3000') 517 518 def test_nonexisting_precedence(self): 519 eq = self.assertEqual 520 msg = email.message_from_string("""\ 521From: aperson@dom.ain 522 523""", Message.Message) 524 CookHeaders.process(self._mlist, msg, {}) 525 eq(msg['precedence'], 'list') 526 527 def test_existing_precedence(self): 528 eq = self.assertEqual 529 msg = email.message_from_string("""\ 530From: aperson@dom.ain 531Precedence: junk 532 533""", Message.Message) 534 CookHeaders.process(self._mlist, msg, {}) 535 eq(len(msg.get_all('precedence')), 1) 536 eq(msg['precedence'], 'junk') 537 538 def test_subject_munging_no_subject(self): 539 self._mlist.subject_prefix = '[XTEST] ' 540 msg = email.message_from_string("""\ 541From: aperson@dom.ain 542 543""", Message.Message) 544 msgdata = {} 545 CookHeaders.process(self._mlist, msg, msgdata) 546 self.assertEqual(msgdata.get('origsubj'), '') 547 self.assertEqual(str(msg['subject']), '[XTEST] (no subject)') 548 549 def test_subject_munging(self): 550 self._mlist.subject_prefix = '[XTEST] ' 551 msg = email.message_from_string("""\ 552From: aperson@dom.ain 553Subject: About Mailman... 554 555""", Message.Message) 556 CookHeaders.process(self._mlist, msg, {}) 557 self.assertEqual(str(msg['subject']), '[XTEST] About Mailman...') 558 559 def test_no_subject_munging_for_digests(self): 560 self._mlist.subject_prefix = '[XTEST] ' 561 msg = email.message_from_string("""\ 562From: aperson@dom.ain 563Subject: About Mailman... 564 565""", Message.Message) 566 CookHeaders.process(self._mlist, msg, {'isdigest': 1}) 567 self.assertEqual(msg['subject'], 'About Mailman...') 568 569 def test_no_subject_munging_for_fasttrack(self): 570 self._mlist.subject_prefix = '[XTEST] ' 571 msg = email.message_from_string("""\ 572From: aperson@dom.ain 573Subject: About Mailman... 574 575""", Message.Message) 576 CookHeaders.process(self._mlist, msg, {'_fasttrack': 1}) 577 self.assertEqual(msg['subject'], 'About Mailman...') 578 579 def test_no_subject_munging_has_prefix(self): 580 self._mlist.subject_prefix = '[XTEST] ' 581 msg = email.message_from_string("""\ 582From: aperson@dom.ain 583Subject: Re: [XTEST] About Mailman... 584 585""", Message.Message) 586 CookHeaders.process(self._mlist, msg, {}) 587 # prefixing depends on mm_cfg.py 588 self.failUnless(str(msg['subject']) == 'Re: [XTEST] About Mailman...' or 589 str(msg['subject']) == '[XTEST] Re: About Mailman...') 590 591 def test_reply_to_list(self): 592 eq = self.assertEqual 593 mlist = self._mlist 594 mlist.reply_goes_to_list = 1 595 mlist.from_is_list = 0 596 msg = email.message_from_string("""\ 597From: aperson@dom.ain 598 599""", Message.Message) 600 msgdata = {} 601 CookHeaders.process(mlist, msg, msgdata) 602 eq(msgdata['add_header']['Reply-To'], '_xtest@dom.ain') 603 eq(msg.get_all('reply-to'), None) 604 605 def test_reply_to_list_fil(self): 606 eq = self.assertEqual 607 mlist = self._mlist 608 mlist.reply_goes_to_list = 1 609 mlist.from_is_list = 1 610 msg = email.message_from_string("""\ 611From: aperson@dom.ain 612 613""", Message.Message) 614 msgdata = {} 615 CookHeaders.process(mlist, msg, msgdata) 616 eq(msgdata['add_header']['Reply-To'], 617 '_xtest@dom.ain') 618 eq(msgdata['add_header']['Cc'], 619 'aperson@dom.ain') 620 eq(msg.get_all('reply-to'), None) 621 eq(msg.get_all('cc'), None) 622 623 def test_reply_to_list_with_strip(self): 624 eq = self.assertEqual 625 mlist = self._mlist 626 mlist.reply_goes_to_list = 1 627 mlist.first_strip_reply_to = 1 628 mlist.from_is_list = 0 629 msg = email.message_from_string("""\ 630From: aperson@dom.ain 631Reply-To: bperson@dom.ain 632 633""", Message.Message) 634 msgdata = {} 635 CookHeaders.process(mlist, msg, msgdata) 636 eq(msgdata['add_header']['Reply-To'], '_xtest@dom.ain') 637 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 638 639 def test_reply_to_list_with_strip_fil(self): 640 eq = self.assertEqual 641 mlist = self._mlist 642 mlist.reply_goes_to_list = 1 643 mlist.first_strip_reply_to = 1 644 mlist.from_is_list = 1 645 msg = email.message_from_string("""\ 646From: aperson@dom.ain 647Reply-To: bperson@dom.ain 648 649""", Message.Message) 650 msgdata = {} 651 CookHeaders.process(mlist, msg, msgdata) 652 eq(msgdata['add_header']['Reply-To'], 653 '_xtest@dom.ain') 654 eq(msgdata['add_header']['Cc'], 655 'aperson@dom.ain') 656 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 657 eq(msg.get_all('cc'), None) 658 659 660 def test_reply_to_explicit(self): 661 eq = self.assertEqual 662 mlist = self._mlist 663 mlist.reply_goes_to_list = 2 664 mlist.from_is_list = 0 665 mlist.reply_to_address = 'mlist@dom.ain' 666 msg = email.message_from_string("""\ 667From: aperson@dom.ain 668 669""", Message.Message) 670 msgdata = {} 671 CookHeaders.process(mlist, msg, msgdata) 672 eq(msgdata['add_header']['Reply-To'], 'mlist@dom.ain') 673 eq(msg.get_all('reply-to'), None) 674 675 def test_reply_to_explicit_fil(self): 676 eq = self.assertEqual 677 mlist = self._mlist 678 mlist.reply_goes_to_list = 2 679 mlist.from_is_list = 1 680 mlist.reply_to_address = 'mlist@dom.ain' 681 msg = email.message_from_string("""\ 682From: aperson@dom.ain 683 684""", Message.Message) 685 msgdata = {} 686 CookHeaders.process(mlist, msg, msgdata) 687 eq(msgdata['add_header']['Reply-To'], 688 'mlist@dom.ain') 689 eq(msgdata['add_header']['Cc'], 690 'aperson@dom.ain') 691 eq(msg.get_all('reply-to'), None) 692 eq(msg.get_all('cc'), None) 693 694 def test_reply_to_explicit_with_strip(self): 695 eq = self.assertEqual 696 mlist = self._mlist 697 mlist.reply_goes_to_list = 2 698 mlist.first_strip_reply_to = 1 699 mlist.from_is_list = 0 700 mlist.reply_to_address = 'mlist@dom.ain' 701 msg = email.message_from_string("""\ 702From: aperson@dom.ain 703Reply-To: bperson@dom.ain 704 705""", Message.Message) 706 msgdata = {} 707 708 CookHeaders.process(self._mlist, msg, msgdata) 709 eq(msgdata['add_header']['Reply-To'], 'mlist@dom.ain') 710 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 711 712 def test_reply_to_explicit_with_strip_fil(self): 713 eq = self.assertEqual 714 mlist = self._mlist 715 mlist.reply_goes_to_list = 2 716 mlist.first_strip_reply_to = 1 717 mlist.from_is_list = 1 718 mlist.reply_to_address = 'mlist@dom.ain' 719 msg = email.message_from_string("""\ 720From: aperson@dom.ain 721Reply-To: bperson@dom.ain 722 723""", Message.Message) 724 msgdata = {} 725 726 CookHeaders.process(self._mlist, msg, msgdata) 727 eq(msgdata['add_header']['Reply-To'], 728 'mlist@dom.ain') 729 eq(msgdata['add_header']['Cc'], 730 'aperson@dom.ain') 731 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 732 eq(msg.get_all('cc'), None) 733 734 def test_reply_to_extends_to_list(self): 735 eq = self.assertEqual 736 mlist = self._mlist 737 mlist.reply_goes_to_list = 1 738 mlist.first_strip_reply_to = 0 739 mlist.from_is_list = 0 740 msg = email.message_from_string("""\ 741From: aperson@dom.ain 742Reply-To: bperson@dom.ain 743 744""", Message.Message) 745 msgdata = {} 746 747 CookHeaders.process(mlist, msg, msgdata) 748 eq(msgdata['add_header']['Reply-To'], 749 'bperson@dom.ain, _xtest@dom.ain') 750 751 def test_reply_to_extends_to_list_fil(self): 752 eq = self.assertEqual 753 mlist = self._mlist 754 mlist.reply_goes_to_list = 1 755 mlist.first_strip_reply_to = 0 756 mlist.from_is_list = 1 757 msg = email.message_from_string("""\ 758From: aperson@dom.ain 759Reply-To: bperson@dom.ain 760 761""", Message.Message) 762 msgdata = {} 763 764 CookHeaders.process(mlist, msg, msgdata) 765 eq(msgdata['add_header']['Reply-To'], 766 'bperson@dom.ain, _xtest@dom.ain') 767 eq(msgdata['add_header']['Cc'], 768 'aperson@dom.ain') 769 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 770 eq(msg.get_all('cc'), None) 771 772 def test_reply_to_extends_to_explicit(self): 773 eq = self.assertEqual 774 mlist = self._mlist 775 mlist.reply_goes_to_list = 2 776 mlist.first_strip_reply_to = 0 777 mlist.from_is_list = 0 778 mlist.reply_to_address = 'mlist@dom.ain' 779 msg = email.message_from_string("""\ 780From: aperson@dom.ain 781Reply-To: bperson@dom.ain 782 783""", Message.Message) 784 msgdata = {} 785 CookHeaders.process(mlist, msg, msgdata) 786 eq(msgdata['add_header']['Reply-To'], 787 'mlist@dom.ain, bperson@dom.ain') 788 789 def test_reply_to_extends_to_explicit_fil(self): 790 eq = self.assertEqual 791 mlist = self._mlist 792 mlist.reply_goes_to_list = 2 793 mlist.first_strip_reply_to = 0 794 mlist.from_is_list = 1 795 mlist.reply_to_address = 'mlist@dom.ain' 796 msg = email.message_from_string("""\ 797From: aperson@dom.ain 798Reply-To: bperson@dom.ain 799 800""", Message.Message) 801 msgdata = {} 802 CookHeaders.process(mlist, msg, msgdata) 803 eq(msgdata['add_header']['Reply-To'], 804 'mlist@dom.ain, bperson@dom.ain') 805 eq(msgdata['add_header']['Cc'], 806 'aperson@dom.ain') 807 eq(msg.get_all('reply-to'), ['bperson@dom.ain']) 808 eq(msg.get_all('cc'), None) 809 810 def test_list_headers_nolist(self): 811 eq = self.assertEqual 812 msg = email.message_from_string("""\ 813From: aperson@dom.ain 814 815""", Message.Message) 816 CookHeaders.process(self._mlist, msg, {'_nolist': 1}) 817 eq(msg['list-id'], None) 818 eq(msg['list-help'], None) 819 eq(msg['list-unsubscribe'], None) 820 eq(msg['list-subscribe'], None) 821 eq(msg['list-post'], None) 822 eq(msg['list-archive'], None) 823 824 def test_list_headers(self): 825 eq = self.assertEqual 826 self._mlist.archive = 1 827 msg = email.message_from_string("""\ 828From: aperson@dom.ain 829 830""", Message.Message) 831 oldval = mm_cfg.DEFAULT_URL_HOST 832 mm_cfg.DEFAULT_URL_HOST = 'www.dom.ain' 833 try: 834 CookHeaders.process(self._mlist, msg, {}) 835 finally: 836 mm_cfg.DEFAULT_URL_HOST = oldval 837 eq(msg['list-id'], '<_xtest.dom.ain>') 838 eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>') 839 eq(msg['list-unsubscribe'], 840 '<http://www.dom.ain/mailman/options/_xtest>,' 841 '\n <mailto:_xtest-request@dom.ain?subject=unsubscribe>') 842 eq(msg['list-subscribe'], 843 '<http://www.dom.ain/mailman/listinfo/_xtest>,' 844 '\n <mailto:_xtest-request@dom.ain?subject=subscribe>') 845 eq(msg['list-post'], '<mailto:_xtest@dom.ain>') 846 eq(msg['list-archive'], '<http://www.dom.ain/pipermail/_xtest/>') 847 848 def test_list_headers_with_description(self): 849 eq = self.assertEqual 850 self._mlist.archive = 1 851 self._mlist.description = 'A Test List' 852 msg = email.message_from_string("""\ 853From: aperson@dom.ain 854 855""", Message.Message) 856 CookHeaders.process(self._mlist, msg, {}) 857 eq(unicode(msg['list-id']), 'A Test List <_xtest.dom.ain>') 858 eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>') 859 eq(msg['list-unsubscribe'], 860 '<http://www.dom.ain/mailman/options/_xtest>,' 861 '\n <mailto:_xtest-request@dom.ain?subject=unsubscribe>') 862 eq(msg['list-subscribe'], 863 '<http://www.dom.ain/mailman/listinfo/_xtest>,' 864 '\n <mailto:_xtest-request@dom.ain?subject=subscribe>') 865 eq(msg['list-post'], '<mailto:_xtest@dom.ain>') 866 867 868 869class TestDecorate(TestBase): 870 def test_short_circuit(self): 871 msgdata = {'isdigest': 1} 872 rtn = Decorate.process(self._mlist, None, msgdata) 873 # Not really a great test, but there's little else to assert 874 self.assertEqual(rtn, None) 875 876 def test_no_multipart(self): 877 mlist = self._mlist 878 mlist.msg_header = 'header\n' 879 mlist.msg_footer = 'footer' 880 msg = email.message_from_string("""\ 881From: aperson@dom.ain 882 883Here is a message. 884""") 885 Decorate.process(self._mlist, msg, {}) 886 self.assertEqual(msg.get_payload(), """\ 887header 888Here is a message. 889footer 890""") 891 892 def test_no_multipart_template(self): 893 mlist = self._mlist 894 mlist.msg_header = '%(real_name)s header\n' 895 mlist.msg_footer = '%(real_name)s footer' 896 mlist.real_name = 'XTest' 897 msg = email.message_from_string("""\ 898From: aperson@dom.ain 899 900Here is a message. 901""") 902 Decorate.process(self._mlist, msg, {}) 903 self.assertEqual(msg.get_payload(), """\ 904XTest header 905Here is a message. 906XTest footer 907""") 908 909 def test_no_multipart_type_error(self): 910 mlist = self._mlist 911 mlist.msg_header = '%(real_name) header\n' 912 mlist.msg_footer = '%(real_name) footer' 913 mlist.real_name = 'XTest' 914 msg = email.message_from_string("""\ 915From: aperson@dom.ain 916 917Here is a message. 918""") 919 Decorate.process(self._mlist, msg, {}) 920 self.assertEqual(msg.get_payload(), """\ 921%(real_name) header 922Here is a message. 923%(real_name) footer 924""") 925 926 def test_no_multipart_value_error(self): 927 mlist = self._mlist 928 # These will generate warnings in logs/error 929 mlist.msg_header = '%(real_name)p header\n' 930 mlist.msg_footer = '%(real_name)p footer' 931 mlist.real_name = 'XTest' 932 msg = email.message_from_string("""\ 933From: aperson@dom.ain 934 935Here is a message. 936""") 937 Decorate.process(self._mlist, msg, {}) 938 self.assertEqual(msg.get_payload(), """\ 939%(real_name)p header 940Here is a message. 941%(real_name)p footer 942""") 943 944 def test_no_multipart_missing_key(self): 945 mlist = self._mlist 946 mlist.msg_header = '%(spooge)s header\n' 947 mlist.msg_footer = '%(spooge)s footer' 948 msg = email.message_from_string("""\ 949From: aperson@dom.ain 950 951Here is a message. 952""") 953 Decorate.process(self._mlist, msg, {}) 954 self.assertEqual(msg.get_payload(), """\ 955%(spooge)s header 956Here is a message. 957%(spooge)s footer 958""") 959 960 def test_multipart(self): 961 eq = self.ndiffAssertEqual 962 mlist = self._mlist 963 mlist.msg_header = 'header' 964 mlist.msg_footer = 'footer' 965 msg1 = email.message_from_string("""\ 966From: aperson@dom.ain 967 968Here is the first message. 969""") 970 msg2 = email.message_from_string("""\ 971From: bperson@dom.ain 972 973Here is the second message. 974""") 975 msg = Message.Message() 976 msg.set_type('multipart/mixed') 977 msg.set_boundary('BOUNDARY') 978 msg.attach(msg1) 979 msg.attach(msg2) 980 Decorate.process(self._mlist, msg, {}) 981 eq(msg.as_string(unixfrom=0), """\ 982MIME-Version: 1.0 983Content-Type: multipart/mixed; boundary="BOUNDARY" 984 985--BOUNDARY 986Content-Type: text/plain; charset="us-ascii" 987MIME-Version: 1.0 988Content-Transfer-Encoding: 7bit 989Content-Disposition: inline 990 991header 992 993--BOUNDARY 994From: aperson@dom.ain 995 996Here is the first message. 997 998--BOUNDARY 999From: bperson@dom.ain 1000 1001Here is the second message. 1002 1003--BOUNDARY 1004Content-Type: text/plain; charset="us-ascii" 1005MIME-Version: 1.0 1006Content-Transfer-Encoding: 7bit 1007Content-Disposition: inline 1008 1009footer 1010 1011--BOUNDARY-- 1012""") 1013 1014 def test_image(self): 1015 eq = self.assertEqual 1016 mlist = self._mlist 1017 mlist.msg_header = 'header\n' 1018 mlist.msg_footer = 'footer' 1019 msg = email.message_from_string("""\ 1020From: aperson@dom.ain 1021Content-type: image/x-spooge 1022 1023IMAGEDATAIMAGEDATAIMAGEDATA 1024""") 1025 Decorate.process(self._mlist, msg, {}) 1026 eq(len(msg.get_payload()), 3) 1027 self.assertEqual(msg.get_payload(1).get_payload(), """\ 1028IMAGEDATAIMAGEDATAIMAGEDATA 1029""") 1030 1031 def test_personalize_assert(self): 1032 raises = self.assertRaises 1033 raises(AssertionError, Decorate.process, 1034 self._mlist, None, {'personalize': 1}) 1035 raises(AssertionError, Decorate.process, 1036 self._mlist, None, {'personalize': 1, 1037 'recips': [1, 2, 3]}) 1038 1039 1040 1041class TestFileRecips(TestBase): 1042 def test_short_circuit(self): 1043 msgdata = {'recips': 1} 1044 rtn = FileRecips.process(self._mlist, None, msgdata) 1045 # Not really a great test, but there's little else to assert 1046 self.assertEqual(rtn, None) 1047 1048 def test_file_nonexistant(self): 1049 msgdata = {} 1050 FileRecips.process(self._mlist, None, msgdata) 1051 self.assertEqual(msgdata.get('recips'), []) 1052 1053 def test_file_exists_no_sender(self): 1054 msg = email.message_from_string("""\ 1055To: yall@dom.ain 1056 1057""", Message.Message) 1058 msgdata = {} 1059 file = os.path.join(self._mlist.fullpath(), 'members.txt') 1060 addrs = ['aperson@dom.ain', 'bperson@dom.ain', 1061 'cperson@dom.ain', 'dperson@dom.ain'] 1062 fp = open(file, 'w') 1063 try: 1064 for addr in addrs: 1065 print >> fp, addr 1066 fp.close() 1067 FileRecips.process(self._mlist, msg, msgdata) 1068 self.assertEqual(msgdata.get('recips'), addrs) 1069 finally: 1070 try: 1071 os.unlink(file) 1072 except OSError, e: 1073 if e.errno <> e.ENOENT: raise 1074 1075 def test_file_exists_no_member(self): 1076 msg = email.message_from_string("""\ 1077From: eperson@dom.ain 1078To: yall@dom.ain 1079 1080""", Message.Message) 1081 msgdata = {} 1082 file = os.path.join(self._mlist.fullpath(), 'members.txt') 1083 addrs = ['aperson@dom.ain', 'bperson@dom.ain', 1084 'cperson@dom.ain', 'dperson@dom.ain'] 1085 fp = open(file, 'w') 1086 try: 1087 for addr in addrs: 1088 print >> fp, addr 1089 fp.close() 1090 FileRecips.process(self._mlist, msg, msgdata) 1091 self.assertEqual(msgdata.get('recips'), addrs) 1092 finally: 1093 try: 1094 os.unlink(file) 1095 except OSError, e: 1096 if e.errno <> e.ENOENT: raise 1097 1098 def test_file_exists_is_member(self): 1099 msg = email.message_from_string("""\ 1100From: aperson@dom.ain 1101To: yall@dom.ain 1102 1103""", Message.Message) 1104 msgdata = {} 1105 file = os.path.join(self._mlist.fullpath(), 'members.txt') 1106 addrs = ['aperson@dom.ain', 'bperson@dom.ain', 1107 'cperson@dom.ain', 'dperson@dom.ain'] 1108 fp = open(file, 'w') 1109 try: 1110 for addr in addrs: 1111 print >> fp, addr 1112 self._mlist.addNewMember(addr) 1113 fp.close() 1114 FileRecips.process(self._mlist, msg, msgdata) 1115 self.assertEqual(msgdata.get('recips'), addrs[1:]) 1116 finally: 1117 try: 1118 os.unlink(file) 1119 except OSError, e: 1120 if e.errno <> e.ENOENT: raise 1121 1122 1123 1124class TestHold(TestBase): 1125 def setUp(self): 1126 TestBase.setUp(self) 1127 self._mlist.administrivia = 1 1128 self._mlist.respond_to_post_requests = 0 1129 self._mlist.admin_immed_notify = 0 1130 # We're going to want to inspect this queue directory 1131 self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) 1132 1133 def tearDown(self): 1134 for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): 1135 os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) 1136 TestBase.tearDown(self) 1137 try: 1138 os.unlink(os.path.join(mm_cfg.DATA_DIR, 'pending.db')) 1139 except OSError, e: 1140 if e.errno <> errno.ENOENT: raise 1141 for f in [holdfile for holdfile in os.listdir(mm_cfg.DATA_DIR) 1142 if holdfile.startswith('heldmsg-')]: 1143 os.unlink(os.path.join(mm_cfg.DATA_DIR, f)) 1144 1145 def test_short_circuit(self): 1146 msgdata = {'approved': 1} 1147 rtn = Hold.process(self._mlist, None, msgdata) 1148 # Not really a great test, but there's little else to assert 1149 self.assertEqual(rtn, None) 1150 1151 def test_administrivia(self): 1152 msg = email.message_from_string("""\ 1153From: aperson@dom.ain 1154Subject: unsubscribe 1155 1156""", Message.Message) 1157 self.assertRaises(Hold.Administrivia, Hold.process, 1158 self._mlist, msg, {}) 1159 1160 def test_max_recips(self): 1161 self._mlist.max_num_recipients = 5 1162 msg = email.message_from_string("""\ 1163From: aperson@dom.ain 1164To: _xtest@dom.ain, bperson@dom.ain 1165Cc: cperson@dom.ain 1166Cc: dperson@dom.ain (Jimmy D. Person) 1167To: Billy E. Person <eperson@dom.ain> 1168 1169Hey folks! 1170""", Message.Message) 1171 self.assertRaises(Hold.TooManyRecipients, Hold.process, 1172 self._mlist, msg, {}) 1173 1174 def test_implicit_destination(self): 1175 self._mlist.require_explicit_destination = 1 1176 msg = email.message_from_string("""\ 1177From: aperson@dom.ain 1178Subject: An implicit message 1179 1180""", Message.Message) 1181 self.assertRaises(Hold.ImplicitDestination, Hold.process, 1182 self._mlist, msg, {}) 1183 1184 def test_implicit_destination_fromusenet(self): 1185 self._mlist.require_explicit_destination = 1 1186 msg = email.message_from_string("""\ 1187From: aperson@dom.ain 1188Subject: An implicit message 1189 1190""", Message.Message) 1191 rtn = Hold.process(self._mlist, msg, {'fromusenet': 1}) 1192 self.assertEqual(rtn, None) 1193 1194 def test_suspicious_header(self): 1195 self._mlist.bounce_matching_headers = 'From: .*person@(blah.)?dom.ain' 1196 msg = email.message_from_string("""\ 1197From: aperson@dom.ain 1198To: _xtest@dom.ain 1199Subject: An implicit message 1200 1201""", Message.Message) 1202 self.assertRaises(Hold.SuspiciousHeaders, Hold.process, 1203 self._mlist, msg, {}) 1204 1205 def test_suspicious_header_ok(self): 1206 self._mlist.bounce_matching_headers = 'From: .*person@blah.dom.ain' 1207 msg = email.message_from_string("""\ 1208From: aperson@dom.ain 1209To: _xtest@dom.ain 1210Subject: An implicit message 1211 1212""", Message.Message) 1213 rtn = Hold.process(self._mlist, msg, {}) 1214 self.assertEqual(rtn, None) 1215 1216 def test_max_message_size(self): 1217 self._mlist.max_message_size = 1 1218 msg = email.message_from_string("""\ 1219From: aperson@dom.ain 1220To: _xtest@dom.ain 1221 1222xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1223xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1224xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1225xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1226xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1227xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1228xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1229xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1230xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1231xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1232xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1233xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1234xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1235xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 1236""", Message.Message) 1237 self.assertRaises(Hold.MessageTooBig, Hold.process, 1238 self._mlist, msg, {}) 1239 1240 def test_hold_notifications(self): 1241 eq = self.assertEqual 1242 self._mlist.respond_to_post_requests = 1 1243 self._mlist.admin_immed_notify = 1 1244 # Now cause an implicit destination hold 1245 msg = email.message_from_string("""\ 1246From: aperson@dom.ain 1247 1248""", Message.Message) 1249 self.assertRaises(Hold.ImplicitDestination, Hold.process, 1250 self._mlist, msg, {}) 1251 # Now we have to make sure there are two messages in the virgin queue, 1252 # one to the sender and one to the list owners. 1253 qfiles = {} 1254 files = self._sb.files() 1255 eq(len(files), 2) 1256 for filebase in files: 1257 qmsg, qdata = self._sb.dequeue(filebase) 1258 to = qmsg['to'] 1259 qfiles[to] = qmsg, qdata 1260 # BAW: We could be testing many other attributes of either the 1261 # messages or the metadata files... 1262 keys = qfiles.keys() 1263 keys.sort() 1264 eq(keys, ['_xtest-owner@dom.ain', 'aperson@dom.ain']) 1265 # Get the pending cookie from the message to the sender 1266 pmsg, pdata = qfiles['aperson@dom.ain'] 1267 confirmlines = pmsg.get_payload().split('\n') 1268 cookie = confirmlines[-3].split('/')[-1] 1269 # We also need to make sure there's an entry in the Pending database 1270 # for the heold message. 1271 data = self._mlist.pend_confirm(cookie) 1272 eq(data, ('H', 1)) 1273 heldmsg = os.path.join(mm_cfg.DATA_DIR, 'heldmsg-_xtest-1.pck') 1274 self.failUnless(os.path.exists(heldmsg)) 1275 os.unlink(heldmsg) 1276 holdfiles = [f for f in os.listdir(mm_cfg.DATA_DIR) 1277 if f.startswith('heldmsg-')] 1278 eq(len(holdfiles), 0) 1279 1280 1281 1282class TestMimeDel(TestBase): 1283 def setUp(self): 1284 TestBase.setUp(self) 1285 self._mlist.filter_content = 1 1286 self._mlist.filter_mime_types = ['image/jpeg'] 1287 self._mlist.pass_mime_types = [] 1288 self._mlist.convert_html_to_plaintext = 1 1289 self._mlist.collapse_alternatives = 1 1290 1291 def test_outer_matches(self): 1292 msg = email.message_from_string("""\ 1293From: aperson@dom.ain 1294Content-Type: image/jpeg 1295MIME-Version: 1.0 1296 1297xxxxx 1298""") 1299 self.assertRaises(Errors.DiscardMessage, MimeDel.process, 1300 self._mlist, msg, {}) 1301 1302 def test_strain_multipart(self): 1303 eq = self.assertEqual 1304 msg = email.message_from_string("""\ 1305From: aperson@dom.ain 1306Content-Type: multipart/mixed; boundary=BOUNDARY 1307MIME-Version: 1.0 1308 1309--BOUNDARY 1310Content-Type: image/jpeg 1311MIME-Version: 1.0 1312 1313xxx 1314 1315--BOUNDARY 1316Content-Type: image/gif 1317MIME-Version: 1.0 1318 1319yyy 1320--BOUNDARY-- 1321""") 1322 MimeDel.process(self._mlist, msg, {}) 1323 self.assertTrue(not msg.is_multipart()) 1324 eq(msg.get_content_type(), 'image/gif') 1325 eq(msg.get_payload(), 'yyy') 1326 1327 def test_collapse_multipart_alternative(self): 1328 eq = self.assertEqual 1329 msg = email.message_from_string("""\ 1330From: aperson@dom.ain 1331Content-Type: multipart/mixed; boundary=BOUNDARY 1332MIME-Version: 1.0 1333 1334--BOUNDARY 1335Content-Type: multipart/alternative; boundary=BOUND2 1336MIME-Version: 1.0 1337 1338--BOUND2 1339Content-Type: image/jpeg 1340MIME-Version: 1.0 1341 1342xxx 1343 1344--BOUND2 1345Content-Type: image/gif 1346MIME-Version: 1.0 1347 1348yyy 1349--BOUND2-- 1350 1351--BOUNDARY-- 1352""") 1353 MimeDel.process(self._mlist, msg, {}) 1354 self.assertTrue(not msg.is_multipart()) 1355 eq(msg.get_content_type(), 'image/gif') 1356 eq(msg.get_payload(), 'yyy') 1357 1358 def test_convert_to_plaintext(self): 1359 # BAW: This test is dependent on your particular lynx version 1360 eq = self.assertEqual 1361 msg = email.message_from_string("""\ 1362From: aperson@dom.ain 1363Content-Type: text/html 1364MIME-Version: 1.0 1365 1366<html><head></head> 1367<body></body></html> 1368""") 1369 MimeDel.process(self._mlist, msg, {}) 1370 eq(msg.get_content_type(), 'text/plain') 1371 #eq(msg.get_payload(), '\n\n\n') 1372 eq(msg.get_payload().strip(), '') 1373 1374 def test_deep_structure(self): 1375 eq = self.assertEqual 1376 self._mlist.filter_mime_types.append('text/html') 1377 msg = email.message_from_string("""\ 1378From: aperson@dom.ain 1379Content-Type: multipart/mixed; boundary=AAA 1380 1381--AAA 1382Content-Type: multipart/mixed; boundary=BBB 1383 1384--BBB 1385Content-Type: image/jpeg 1386 1387xxx 1388--BBB 1389Content-Type: image/jpeg 1390 1391yyy 1392--BBB--- 1393--AAA 1394Content-Type: multipart/alternative; boundary=CCC 1395 1396--CCC 1397Content-Type: text/html 1398 1399<h2>This is a header</h2> 1400 1401--CCC 1402Content-Type: text/plain 1403 1404A different message 1405--CCC-- 1406--AAA 1407Content-Type: image/gif 1408 1409zzz 1410--AAA 1411Content-Type: image/gif 1412 1413aaa 1414--AAA-- 1415""") 1416 MimeDel.process(self._mlist, msg, {}) 1417 payload = msg.get_payload() 1418 eq(len(payload), 3) 1419 part1 = msg.get_payload(0) 1420 eq(part1.get_content_type(), 'text/plain') 1421 eq(part1.get_payload(), 'A different message') 1422 part2 = msg.get_payload(1) 1423 eq(part2.get_content_type(), 'image/gif') 1424 eq(part2.get_payload(), 'zzz') 1425 part3 = msg.get_payload(2) 1426 eq(part3.get_content_type(), 'image/gif') 1427 eq(part3.get_payload(), 'aaa') 1428 1429 def test_top_multipart_alternative(self): 1430 eq = self.assertEqual 1431 self._mlist.filter_mime_types.append('text/html') 1432 msg = email.message_from_string("""\ 1433From: aperson@dom.ain 1434Content-Type: multipart/alternative; boundary=AAA 1435 1436--AAA 1437Content-Type: text/html 1438 1439<b>This is some html</b> 1440--AAA 1441Content-Type: text/plain 1442 1443This is plain text 1444--AAA-- 1445""") 1446 MimeDel.process(self._mlist, msg, {}) 1447 eq(msg.get_content_type(), 'text/plain') 1448 eq(msg.get_payload(), 'This is plain text') 1449 1450 def test_recast_multipart(self): 1451 eq = self.assertEqual 1452 self._mlist.filter_mime_types.append('application/pdf') 1453 msg = email.message_from_string("""\ 1454From: aperson@dom.ain 1455MIME-Version: 1.0 1456Content-type: multipart/mixed; 1457 boundary="Boundary_0" 1458 1459--Boundary_0 1460Content-Type: multipart/mixed; 1461 boundary="Boundary_1" 1462 1463--Boundary_1 1464Content-type: multipart/mixed; 1465 boundary="Boundary_2" 1466 1467--Boundary_2 1468Content-type: multipart/alternative; 1469 boundary="Boundary_3" 1470 1471--Boundary_3 1472Content-type: text/plain; charset=us-ascii 1473Content-transfer-encoding: 7BIT 1474 1475Plain text part 1476--Boundary_3 1477Content-type: text/html; charset=us-ascii 1478Content-transfer-encoding: 7BIT 1479 1480HTML part 1481--Boundary_3-- 1482 1483 1484--Boundary_2 1485Content-type: application/pdf 1486Content-transfer-encoding: 7BIT 1487 1488PDF part inner 2 1489--Boundary_2-- 1490--Boundary_1 1491Content-type: text/plain; charset=us-ascii 1492Content-transfer-encoding: 7BIT 1493 1494second text 1495--Boundary_1-- 1496 1497--Boundary_0 1498Content-Type: application/pdf 1499Content-transfer-encoding: 7BIT 1500 1501PDF part outer 1502--Boundary_0-- 1503""") 1504 MimeDel.process(self._mlist, msg, {}) 1505 payload = msg.get_payload() 1506 eq(len(payload), 2) 1507 part1 = msg.get_payload(0) 1508 eq(part1.get_content_type(), 'text/plain') 1509 eq(part1.get_payload(), 'Plain text part') 1510 part2 = msg.get_payload(1) 1511 eq(part2.get_content_type(), 'text/plain') 1512 eq(part2.get_payload(), 'second text') 1513 1514 def test_message_rfc822(self): 1515 eq = self.assertEqual 1516 msg = email.message_from_string("""\ 1517Message-ID: <4D9E6AEA.1060802@example.net> 1518Date: Thu, 07 Apr 2011 18:54:50 -0700 1519From: User <user@example.com> 1520MIME-Version: 1.0 1521To: Someone <someone@example.net> 1522Subject: Message Subject 1523Content-Type: multipart/mixed; 1524 boundary="------------050603050603060608020908" 1525 1526This is a multi-part message in MIME format. 1527--------------050603050603060608020908 1528Content-Type: text/plain; charset=ISO-8859-1 1529Content-Transfer-Encoding: 7bit 1530 1531Plain body. 1532 1533--------------050603050603060608020908 1534Content-Type: message/rfc822 1535Content-Transfer-Encoding: 7bit 1536Content-Disposition: attachment 1537 1538Message-ID: <4D9E647F.4050308@example.net> 1539Date: Thu, 07 Apr 2011 18:27:27 -0700 1540From: User1 <user1@example.com> 1541MIME-Version: 1.0 1542To: Someone1 <someone1@example.net> 1543Content-Type: multipart/mixed; boundary="------------060107040402070208020705" 1544Subject: Attached Message 1 Subject 1545 1546This is a multi-part message in MIME format. 1547--------------060107040402070208020705 1548Content-Type: text/plain; charset=ISO-8859-1 1549Content-Transfer-Encoding: 7bit 1550 1551Attached Message 1 body. 1552 1553--------------060107040402070208020705 1554Content-Type: message/rfc822 1555Content-Transfer-Encoding: 7bit 1556Content-Disposition: attachment 1557 1558From: User2 <user2@example.com> 1559To: Someone2 <someone2@example.net> 1560Subject: Attached Message 2 Subject 1561Date: Thu, 7 Apr 2011 19:09:35 -0500 1562Message-ID: <DAE689E1FD1D493BACD15180145B4151@example.net> 1563MIME-Version: 1.0 1564Content-Type: multipart/mixed; 1565 boundary="----=_NextPart_000_0066_01CBF557.56C6F370" 1566 1567This is a multi-part message in MIME format. 1568 1569------=_NextPart_000_0066_01CBF557.56C6F370 1570Content-Type: text/plain; 1571 charset="us-ascii" 1572Content-Transfer-Encoding: 7bit 1573 1574Attached Message 2 body. 1575 1576------=_NextPart_000_0066_01CBF557.56C6F370 1577Content-Type: message/rfc822 1578Content-Transfer-Encoding: 7bit 1579Content-Disposition: attachment 1580 1581From: User3 <user3@example.com> 1582To: Someone3 <someone3@example.net> 1583Subject: Attached Message 3 Subject 1584Date: Thu, 7 Apr 2011 17:22:04 -0500 1585Message-ID: <BANLkTi=SzfNJo-V7cvrg3nE3uOi9uxXv3g@example.net> 1586MIME-Version: 1.0 1587Content-Type: multipart/alternative; 1588 boundary="----=_NextPart_000_0058_01CBF557.56C48270" 1589 1590This is a multi-part message in MIME format. 1591 1592------=_NextPart_000_0058_01CBF557.56C48270 1593Content-Type: text/plain; 1594 charset="iso-8859-1" 1595Content-Transfer-Encoding: 7bit 1596 1597Attached Message 3 plain body. 1598 1599------=_NextPart_000_0058_01CBF557.56C48270 1600Content-Type: text/html; 1601 charset="iso-8859-1" 1602Content-Transfer-Encoding: quoted-printable 1603 1604 1605Attached Message 3 html body. 1606 1607------=_NextPart_000_0058_01CBF557.56C48270-- 1608 1609------=_NextPart_000_0066_01CBF557.56C6F370 1610Content-Type: message/rfc822 1611Content-Transfer-Encoding: 7bit 1612Content-Disposition: attachment 1613 1614From: User4 <user4@example.com> 1615To: Someone4 <someone4@example.net> 1616Subject: Attached Message 4 Subject 1617Date: Thu, 7 Apr 2011 17:24:26 -0500 1618Message-ID: <19CC3BDF28CF49AD988FF43B2DBC5F1D@example> 1619MIME-Version: 1.0 1620Content-Type: multipart/mixed; 1621 boundary="----=_NextPart_000_0060_01CBF557.56C6F370" 1622 1623This is a multi-part message in MIME format. 1624 1625------=_NextPart_000_0060_01CBF557.56C6F370 1626Content-Type: multipart/alternative; 1627 boundary="----=_NextPart_001_0061_01CBF557.56C6F370" 1628 1629------=_NextPart_001_0061_01CBF557.56C6F370 1630Content-Type: text/plain; 1631 charset="us-ascii" 1632Content-Transfer-Encoding: 7bit 1633 1634Attached Message 4 plain body. 1635 1636------=_NextPart_001_0061_01CBF557.56C6F370 1637Content-Type: text/html; 1638 charset="us-ascii" 1639Content-Transfer-Encoding: quoted-printable 1640 1641Attached Message 4 html body. 1642 1643------=_NextPart_001_0061_01CBF557.56C6F370-- 1644 1645------=_NextPart_000_0060_01CBF557.56C6F370 1646Content-Type: message/rfc822 1647Content-Transfer-Encoding: 7bit 1648Content-Disposition: attachment 1649 1650From: User5 <user5@example.com> 1651To: Someone5 <someone5@example.net> 1652Subject: Attached Message 5 Subject 1653Date: Thu, 7 Apr 2011 16:24:26 -0500 1654Message-ID: <some_id@example> 1655Content-Type: multipart/alternative; 1656 boundary="----=_NextPart_000_005C_01CBF557.56C6F370" 1657 1658This is a multi-part message in MIME format. 1659 1660------=_NextPart_000_005C_01CBF557.56C6F370 1661Content-Type: text/plain; 1662 charset="iso-8859-1" 1663Content-Transfer-Encoding: 7bit 1664 1665Attached Message 5 plain body. 1666 1667------=_NextPart_000_005C_01CBF557.56C6F370 1668Content-Type: text/html; 1669 charset="iso-8859-1" 1670Content-Transfer-Encoding: quoted-printable 1671 1672Attached Message 5 html body. 1673 1674------=_NextPart_000_005C_01CBF557.56C6F370-- 1675 1676------=_NextPart_000_0060_01CBF557.56C6F370 1677Content-Type: text/plain; 1678 name="ATT00055.txt" 1679Content-Transfer-Encoding: quoted-printable 1680Content-Disposition: attachment; 1681 filename="ATT00055.txt" 1682 1683Another plain part. 1684 1685------=_NextPart_000_0060_01CBF557.56C6F370-- 1686 1687------=_NextPart_000_0066_01CBF557.56C6F370-- 1688 1689--------------060107040402070208020705 1690Content-Type: text/plain; charset="us-ascii" 1691MIME-Version: 1.0 1692Content-Transfer-Encoding: 7bit 1693Content-Disposition: inline 1694 1695Final plain part. 1696 1697--------------060107040402070208020705-- 1698 1699--------------050603050603060608020908-- 1700""") 1701 MimeDel.process(self._mlist, msg, {}) 1702 payload = msg.get_payload() 1703 eq(len(payload), 2) 1704 part1 = msg.get_payload(0) 1705 eq(part1.get_content_type(), 'text/plain') 1706 eq(part1.get_payload(), 'Plain body.\n') 1707 part2 = msg.get_payload(1) 1708 eq(part2.get_content_type(), 'message/rfc822') 1709 payload = part2.get_payload() 1710 eq(len(payload), 1) 1711 part1 = part2.get_payload(0) 1712 eq(part1['subject'], 'Attached Message 1 Subject') 1713 eq(part1.get_content_type(), 'multipart/mixed') 1714 payload = part1.get_payload() 1715 eq(len(payload), 3) 1716 part3 = part1.get_payload(2) 1717 eq(part3.get_content_type(), 'text/plain') 1718 eq(part3.get_payload(), 'Final plain part.\n') 1719 part2 = part1.get_payload(1) 1720 eq(part2.get_content_type(), 'message/rfc822') 1721 part1 = part1.get_payload(0) 1722 eq(part1.get_content_type(), 'text/plain') 1723 eq(part1.get_payload(), 'Attached Message 1 body.\n') 1724 payload = part2.get_payload() 1725 eq(len(payload), 1) 1726 part1 = part2.get_payload(0) 1727 eq(part1['subject'], 'Attached Message 2 Subject') 1728 eq(part1.get_content_type(), 'multipart/mixed') 1729 payload = part1.get_payload() 1730 eq(len(payload), 3) 1731 part3 = part1.get_payload(2) 1732 eq(part3.get_content_type(), 'message/rfc822') 1733 part2 = part1.get_payload(1) 1734 eq(part2.get_content_type(), 'message/rfc822') 1735 part1 = part1.get_payload(0) 1736 eq(part1.get_content_type(), 'text/plain') 1737 eq(part1.get_payload(), 'Attached Message 2 body.\n') 1738 payload = part2.get_payload() 1739 eq(len(payload), 1) 1740 part1 = part2.get_payload(0) 1741 eq(part1['subject'], 'Attached Message 3 Subject') 1742 eq(part1.get_content_type(), 'text/plain') 1743 eq(part1.get_payload(), 'Attached Message 3 plain body.\n') 1744 payload = part3.get_payload() 1745 eq(len(payload), 1) 1746 part1 = part3.get_payload(0) 1747 eq(part1['subject'], 'Attached Message 4 Subject') 1748 eq(part1.get_content_type(), 'multipart/mixed') 1749 payload = part1.get_payload() 1750 eq(len(payload), 3) 1751 part3 = part1.get_payload(2) 1752 eq(part3.get_content_type(), 'text/plain') 1753 eq(part3.get_filename(), 'ATT00055.txt') 1754 eq(part3.get_payload(), 'Another plain part.\n') 1755 part2 = part1.get_payload(1) 1756 eq(part2.get_content_type(), 'message/rfc822') 1757 part1 = part1.get_payload(0) 1758 eq(part1.get_content_type(), 'text/plain') 1759 eq(part1.get_payload(), 'Attached Message 4 plain body.\n') 1760 payload = part2.get_payload() 1761 eq(len(payload), 1) 1762 part1 = part2.get_payload(0) 1763 eq(part1['subject'], 'Attached Message 5 Subject') 1764 eq(part1.get_content_type(), 'text/plain') 1765 eq(part1.get_payload(), 'Attached Message 5 plain body.\n') 1766 1767 1768class TestModerate(TestBase): 1769 pass 1770 1771 1772 1773class TestReplybot(TestBase): 1774 pass 1775 1776 1777 1778class TestSpamDetect(TestBase): 1779 def test_short_circuit(self): 1780 msgdata = {'approved': 1} 1781 msg = email.message_from_string('', Message.Message) 1782 rtn = SpamDetect.process(self._mlist, msg, msgdata) 1783 # Not really a great test, but there's little else to assert 1784 self.assertEqual(rtn, None) 1785 1786 def test_spam_detect(self): 1787 msg1 = email.message_from_string("""\ 1788From: aperson@dom.ain 1789 1790A message. 1791""", Message.Message) 1792 msg2 = email.message_from_string("""\ 1793To: xlist@dom.ain 1794 1795A message. 1796""", Message.Message) 1797 spammers = mm_cfg.KNOWN_SPAMMERS[:] 1798 try: 1799 mm_cfg.KNOWN_SPAMMERS.append(('from', '.?person')) 1800 self.assertRaises(SpamDetect.SpamDetected, 1801 SpamDetect.process, self._mlist, msg1, {}) 1802 rtn = SpamDetect.process(self._mlist, msg2, {}) 1803 self.assertEqual(rtn, None) 1804 finally: 1805 mm_cfg.KNOWN_SPAMMERS = spammers 1806 1807 1808 1809class TestTagger(TestBase): 1810 def setUp(self): 1811 TestBase.setUp(self) 1812 self._mlist.topics = [('bar fight', '.*bar.*', 'catch any bars', 1)] 1813 self._mlist.topics_enabled = 1 1814 1815 def test_short_circuit(self): 1816 self._mlist.topics_enabled = 0 1817 rtn = Tagger.process(self._mlist, None, {}) 1818 # Not really a great test, but there's little else to assert 1819 self.assertEqual(rtn, None) 1820 1821 def test_simple(self): 1822 eq = self.assertEqual 1823 mlist = self._mlist 1824 mlist.topics_bodylines_limit = 0 1825 msg = email.message_from_string("""\ 1826Subject: foobar 1827Keywords: barbaz 1828 1829""") 1830 msgdata = {} 1831 Tagger.process(mlist, msg, msgdata) 1832 eq(msg['x-topics'], 'bar fight') 1833 eq(msgdata.get('topichits'), ['bar fight']) 1834 1835 def test_all_body_lines_plain_text(self): 1836 eq = self.assertEqual 1837 mlist = self._mlist 1838 mlist.topics_bodylines_limit = -1 1839 msg = email.message_from_string("""\ 1840Subject: Was 1841Keywords: Raw 1842 1843Subject: farbaw 1844Keywords: barbaz 1845""") 1846 msgdata = {} 1847 Tagger.process(mlist, msg, msgdata) 1848 eq(msg['x-topics'], 'bar fight') 1849 eq(msgdata.get('topichits'), ['bar fight']) 1850 1851 def test_no_body_lines(self): 1852 eq = self.assertEqual 1853 mlist = self._mlist 1854 mlist.topics_bodylines_limit = 0 1855 msg = email.message_from_string("""\ 1856Subject: Was 1857Keywords: Raw 1858 1859Subject: farbaw 1860Keywords: barbaz 1861""") 1862 msgdata = {} 1863 Tagger.process(mlist, msg, msgdata) 1864 eq(msg['x-topics'], None) 1865 eq(msgdata.get('topichits'), None) 1866 1867 def test_body_lines_in_multipart(self): 1868 eq = self.assertEqual 1869 mlist = self._mlist 1870 mlist.topics_bodylines_limit = -1 1871 msg = email.message_from_string("""\ 1872Subject: Was 1873Keywords: Raw 1874Content-Type: multipart/alternative; boundary="BOUNDARY" 1875 1876--BOUNDARY 1877From: sabo 1878To: obas 1879 1880Subject: farbaw 1881Keywords: barbaz 1882 1883--BOUNDARY-- 1884""") 1885 msgdata = {} 1886 Tagger.process(mlist, msg, msgdata) 1887 eq(msg['x-topics'], 'bar fight') 1888 eq(msgdata.get('topichits'), ['bar fight']) 1889 1890 def test_body_lines_no_part(self): 1891 eq = self.assertEqual 1892 mlist = self._mlist 1893 mlist.topics_bodylines_limit = -1 1894 msg = email.message_from_string("""\ 1895Subject: Was 1896Keywords: Raw 1897Content-Type: multipart/alternative; boundary=BOUNDARY 1898 1899--BOUNDARY 1900From: sabo 1901To: obas 1902Content-Type: message/rfc822 1903 1904Subject: farbaw 1905Keywords: barbaz 1906 1907--BOUNDARY 1908From: sabo 1909To: obas 1910Content-Type: message/rfc822 1911 1912Subject: farbaw 1913Keywords: barbaz 1914 1915--BOUNDARY-- 1916""") 1917 msgdata = {} 1918 Tagger.process(mlist, msg, msgdata) 1919 eq(msg['x-topics'], None) 1920 eq(msgdata.get('topichits'), None) 1921 1922 1923 1924class TestToArchive(TestBase): 1925 def setUp(self): 1926 TestBase.setUp(self) 1927 # We're going to want to inspect this queue directory 1928 self._sb = Switchboard(mm_cfg.ARCHQUEUE_DIR) 1929 1930 def tearDown(self): 1931 for f in os.listdir(mm_cfg.ARCHQUEUE_DIR): 1932 os.unlink(os.path.join(mm_cfg.ARCHQUEUE_DIR, f)) 1933 TestBase.tearDown(self) 1934 1935 def test_short_circuit(self): 1936 eq = self.assertEqual 1937 msgdata = {'isdigest': 1} 1938 ToArchive.process(self._mlist, None, msgdata) 1939 eq(len(self._sb.files()), 0) 1940 # Try the other half of the or... 1941 self._mlist.archive = 0 1942 ToArchive.process(self._mlist, None, msgdata) 1943 eq(len(self._sb.files()), 0) 1944 # Now try the various message header shortcuts 1945 msg = email.message_from_string("""\ 1946X-No-Archive: YES 1947 1948""") 1949 self._mlist.archive = 1 1950 ToArchive.process(self._mlist, msg, {}) 1951 eq(len(self._sb.files()), 0) 1952 # And for backwards compatibility 1953 msg = email.message_from_string("""\ 1954X-Archive: NO 1955 1956""") 1957 ToArchive.process(self._mlist, msg, {}) 1958 eq(len(self._sb.files()), 0) 1959 1960 def test_normal_archiving(self): 1961 eq = self.assertEqual 1962 msg = email.message_from_string("""\ 1963Subject: About Mailman 1964 1965It rocks! 1966""") 1967 ToArchive.process(self._mlist, msg, {}) 1968 files = self._sb.files() 1969 eq(len(files), 1) 1970 msg2, data = self._sb.dequeue(files[0]) 1971 eq(len(data), 3) 1972 eq(data['_parsemsg'], False) 1973 eq(data['version'], 3) 1974 # Clock skew makes this unreliable 1975 #self.failUnless(data['received_time'] <= time.time()) 1976 eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) 1977 1978 1979 1980class TestToDigest(TestBase): 1981 def _makemsg(self, i=0): 1982 msg = email.message_from_string("""From: aperson@dom.ain 1983To: _xtest@dom.ain 1984Subject: message number %(i)d 1985 1986Here is message %(i)d 1987""" % {'i' : i}) 1988 return msg 1989 1990 def setUp(self): 1991 TestBase.setUp(self) 1992 self._path = os.path.join(self._mlist.fullpath(), 'digest.mbox') 1993 fp = open(self._path, 'w') 1994 g = Generator(fp) 1995 for i in range(5): 1996 g.flatten(self._makemsg(i), unixfrom=1) 1997 fp.close() 1998 self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) 1999 2000 def tearDown(self): 2001 try: 2002 os.unlink(self._path) 2003 except OSError, e: 2004 if e.errno <> errno.ENOENT: raise 2005 for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): 2006 os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) 2007 TestBase.tearDown(self) 2008 2009 def test_short_circuit(self): 2010 eq = self.assertEqual 2011 mlist = self._mlist 2012 mlist.digestable = 0 2013 eq(ToDigest.process(mlist, None, {}), None) 2014 mlist.digestable = 1 2015 eq(ToDigest.process(mlist, None, {'isdigest': 1}), None) 2016 eq(self._sb.files(), []) 2017 2018 def test_undersized(self): 2019 msg = self._makemsg(99) 2020 size = os.path.getsize(self._path) + len(str(msg)) 2021 self._mlist.digest_size_threshhold = (size + 1) * 1024 2022 ToDigest.process(self._mlist, msg, {}) 2023 self.assertEqual(self._sb.files(), []) 2024 2025 def test_send_a_digest(self): 2026 eq = self.assertEqual 2027 mlist = self._mlist 2028 msg = self._makemsg(99) 2029 size = os.path.getsize(self._path) + len(str(msg)) 2030 # Set digest_size_threshhold to a very small value to force a digest. 2031 # Setting to zero no longer works. 2032 mlist.digest_size_threshhold = 0.001 2033 ToDigest.process(mlist, msg, {}) 2034 files = self._sb.files() 2035 # There should be two files in the queue, one for the MIME digest and 2036 # one for the RFC 1153 digest. 2037 eq(len(files), 2) 2038 # Now figure out which of the two files is the MIME digest and which 2039 # is the RFC 1153 digest. 2040 for filebase in files: 2041 qmsg, qdata = self._sb.dequeue(filebase) 2042 if qmsg.get_content_maintype() == 'multipart': 2043 mimemsg = qmsg 2044 mimedata = qdata 2045 else: 2046 rfc1153msg = qmsg 2047 rfc1153data = qdata 2048 eq(rfc1153msg.get_content_type(), 'text/plain') 2049 eq(mimemsg.get_content_type(), 'multipart/mixed') 2050 eq(mimemsg['from'], mlist.GetRequestEmail()) 2051 eq(mimemsg['subject'], 2052 '%(realname)s Digest, Vol %(volume)d, Issue %(issue)d' % { 2053 'realname': mlist.real_name, 2054 'volume' : mlist.volume, 2055 'issue' : mlist.next_digest_number - 1, 2056 }) 2057 eq(mimemsg['to'], mlist.GetListEmail()) 2058 # BAW: this test is incomplete... 2059 2060 2061 2062class TestToOutgoing(TestBase): 2063 def setUp(self): 2064 TestBase.setUp(self) 2065 # We're going to want to inspect this queue directory 2066 self._sb = Switchboard(mm_cfg.OUTQUEUE_DIR) 2067 2068 def tearDown(self): 2069 for f in os.listdir(mm_cfg.OUTQUEUE_DIR): 2070 os.unlink(os.path.join(mm_cfg.OUTQUEUE_DIR, f)) 2071 TestBase.tearDown(self) 2072 2073 def test_outgoing(self): 2074 eq = self.assertEqual 2075 msg = email.message_from_string("""\ 2076Subject: About Mailman 2077 2078It rocks! 2079""") 2080 msgdata = {'foo': 1, 'bar': 2} 2081 ToOutgoing.process(self._mlist, msg, msgdata) 2082 files = self._sb.files() 2083 eq(len(files), 1) 2084 msg2, data = self._sb.dequeue(files[0]) 2085 eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) 2086 self.failUnless(len(data) >= 6 and len(data) <= 7) 2087 eq(data['foo'], 1) 2088 eq(data['bar'], 2) 2089 eq(data['version'], 3) 2090 eq(data['listname'], '_xtest') 2091 eq(data['_parsemsg'], False) 2092 # Can't test verp. presence/value depend on mm_cfg.py 2093 #eq(data['verp'], 1) 2094 # Clock skew makes this unreliable 2095 #self.failUnless(data['received_time'] <= time.time()) 2096 2097 2098 2099class TestToUsenet(TestBase): 2100 def setUp(self): 2101 TestBase.setUp(self) 2102 # We're going to want to inspect this queue directory 2103 self._sb = Switchboard(mm_cfg.NEWSQUEUE_DIR) 2104 2105 def tearDown(self): 2106 for f in os.listdir(mm_cfg.NEWSQUEUE_DIR): 2107 os.unlink(os.path.join(mm_cfg.NEWSQUEUE_DIR, f)) 2108 TestBase.tearDown(self) 2109 2110 def test_short_circuit(self): 2111 eq = self.assertEqual 2112 mlist = self._mlist 2113 mlist.gateway_to_news = 0 2114 ToUsenet.process(mlist, None, {}) 2115 eq(len(self._sb.files()), 0) 2116 mlist.gateway_to_news = 1 2117 ToUsenet.process(mlist, None, {'isdigest': 1}) 2118 eq(len(self._sb.files()), 0) 2119 ToUsenet.process(mlist, None, {'fromusenet': 1}) 2120 eq(len(self._sb.files()), 0) 2121 2122 def test_to_usenet(self): 2123 # BAW: Should we, can we, test the error conditions that only log to a 2124 # file instead of raising an exception? 2125 eq = self.assertEqual 2126 mlist = self._mlist 2127 mlist.gateway_to_news = 1 2128 mlist.linked_newsgroup = 'foo' 2129 mlist.nntp_host = 'bar' 2130 msg = email.message_from_string("""\ 2131Subject: About Mailman 2132 2133Mailman rocks! 2134""") 2135 ToUsenet.process(mlist, msg, {}) 2136 files = self._sb.files() 2137 eq(len(files), 1) 2138 msg2, data = self._sb.dequeue(files[0]) 2139 eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) 2140 eq(data['version'], 3) 2141 eq(data['listname'], '_xtest') 2142 # Clock skew makes this unreliable 2143 #self.failUnless(data['received_time'] <= time.time()) 2144 2145 2146 2147def suite(): 2148 suite = unittest.TestSuite() 2149 suite.addTest(unittest.makeSuite(TestAcknowledge)) 2150 suite.addTest(unittest.makeSuite(TestAfterDelivery)) 2151 suite.addTest(unittest.makeSuite(TestApprove)) 2152 suite.addTest(unittest.makeSuite(TestCalcRecips)) 2153 suite.addTest(unittest.makeSuite(TestCleanse)) 2154 suite.addTest(unittest.makeSuite(TestCookHeaders)) 2155 suite.addTest(unittest.makeSuite(TestDecorate)) 2156 suite.addTest(unittest.makeSuite(TestFileRecips)) 2157 suite.addTest(unittest.makeSuite(TestHold)) 2158 suite.addTest(unittest.makeSuite(TestMimeDel)) 2159 suite.addTest(unittest.makeSuite(TestModerate)) 2160 suite.addTest(unittest.makeSuite(TestReplybot)) 2161 suite.addTest(unittest.makeSuite(TestSpamDetect)) 2162 suite.addTest(unittest.makeSuite(TestTagger)) 2163 suite.addTest(unittest.makeSuite(TestToArchive)) 2164 suite.addTest(unittest.makeSuite(TestToDigest)) 2165 suite.addTest(unittest.makeSuite(TestToOutgoing)) 2166 suite.addTest(unittest.makeSuite(TestToUsenet)) 2167 return suite 2168 2169 2170 2171if __name__ == '__main__': 2172 unittest.main(defaultTest='suite') 2173