1# Copyright (C) 2001-2018 by the Free Software Foundation, Inc.
2#
3# This program is free software; you can redistribute it and/or
4# modify it under the terms of the GNU General Public License
5# as published by the Free Software Foundation; either version 2
6# of the License, or (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16# USA.
17
18"""Unit tests for the various Mailman/Handlers/*.py modules.
19"""
20
21import os
22import time
23import email
24import errno
25import cPickle
26import unittest
27from types import ListType
28from email.Generator import Generator
29try:
30    from Mailman import __init__
31except ImportError:
32    import paths
33
34from Mailman import mm_cfg
35from Mailman.MailList import MailList
36from Mailman import Message
37from Mailman import Errors
38from Mailman import Pending
39from Mailman.Queue.Switchboard import Switchboard
40
41from Mailman.Handlers import Acknowledge
42from Mailman.Handlers import AfterDelivery
43from Mailman.Handlers import Approve
44from Mailman.Handlers import CalcRecips
45from Mailman.Handlers import Cleanse
46from Mailman.Handlers import CookHeaders
47from Mailman.Handlers import Decorate
48from Mailman.Handlers import FileRecips
49from Mailman.Handlers import Hold
50from Mailman.Handlers import MimeDel
51from Mailman.Handlers import Moderate
52from Mailman.Handlers import Replybot
53# Don't test handlers such as SMTPDirect and Sendmail here
54from Mailman.Handlers import SpamDetect
55from Mailman.Handlers import Tagger
56from Mailman.Handlers import ToArchive
57from Mailman.Handlers import ToDigest
58from Mailman.Handlers import ToOutgoing
59from Mailman.Handlers import ToUsenet
60from Mailman.Utils import sha_new
61
62from TestBase import TestBase
63
64
65
66def password(plaintext):
67    return sha_new(plaintext).hexdigest()
68
69
70
71class TestAcknowledge(TestBase):
72    def setUp(self):
73        TestBase.setUp(self)
74        # We're going to want to inspect this queue directory
75        self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR)
76        # Add a member
77        self._mlist.addNewMember('aperson@dom.ain')
78        self._mlist.personalize = False
79        self._mlist.dmarc_moderation_action = 0
80
81    def tearDown(self):
82        for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR):
83            os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f))
84        TestBase.tearDown(self)
85
86    def test_no_ack_msgdata(self):
87        eq = self.assertEqual
88        # Make sure there are no files in the virgin queue already
89        eq(len(self._sb.files()), 0)
90        msg = email.message_from_string("""\
91From: aperson@dom.ain
92
93""", Message.Message)
94        Acknowledge.process(self._mlist, msg,
95                            {'original_sender': 'aperson@dom.ain'})
96        eq(len(self._sb.files()), 0)
97
98    def test_no_ack_not_a_member(self):
99        eq = self.assertEqual
100        # Make sure there are no files in the virgin queue already
101        eq(len(self._sb.files()), 0)
102        msg = email.message_from_string("""\
103From: bperson@dom.ain
104
105""", Message.Message)
106        Acknowledge.process(self._mlist, msg,
107                            {'original_sender': 'bperson@dom.ain'})
108        eq(len(self._sb.files()), 0)
109
110    def test_no_ack_sender(self):
111        eq = self.assertEqual
112        eq(len(self._sb.files()), 0)
113        msg = email.message_from_string("""\
114From: aperson@dom.ain
115
116""", Message.Message)
117        Acknowledge.process(self._mlist, msg, {})
118        eq(len(self._sb.files()), 0)
119
120    def test_ack_no_subject(self):
121        eq = self.assertEqual
122        self._mlist.setMemberOption(
123            'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1)
124        eq(len(self._sb.files()), 0)
125        msg = email.message_from_string("""\
126From: aperson@dom.ain
127
128""", Message.Message)
129        Acknowledge.process(self._mlist, msg, {})
130        files = self._sb.files()
131        eq(len(files), 1)
132        qmsg, qdata = self._sb.dequeue(files[0])
133        # Check the .db file
134        eq(qdata.get('listname'), '_xtest')
135        eq(qdata.get('recips'), ['aperson@dom.ain'])
136        eq(qdata.get('version'), 3)
137        # Check the .pck
138        eq(str(str(qmsg['subject'])), '_xtest post acknowledgement')
139        eq(qmsg['to'], 'aperson@dom.ain')
140        eq(qmsg['from'], '_xtest-bounces@dom.ain')
141        eq(qmsg.get_content_type(), 'text/plain')
142        eq(qmsg.get_param('charset'), 'us-ascii')
143        msgid = qmsg['message-id']
144        self.failUnless(msgid.startswith('<mailman.'))
145        self.failUnless(msgid.endswith('._xtest@dom.ain>'))
146        eq(qmsg.get_payload(), """\
147Your message entitled
148
149    (no subject)
150
151was successfully received by the _xtest mailing list.
152
153List info page: http://www.dom.ain/mailman/listinfo/_xtest
154Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
155""")
156        # Make sure we dequeued the only message
157        eq(len(self._sb.files()), 0)
158
159    def test_ack_with_subject(self):
160        eq = self.assertEqual
161        self._mlist.setMemberOption(
162            'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1)
163        eq(len(self._sb.files()), 0)
164        msg = email.message_from_string("""\
165From: aperson@dom.ain
166Subject: Wish you were here
167
168""", Message.Message)
169        Acknowledge.process(self._mlist, msg, {})
170        files = self._sb.files()
171        eq(len(files), 1)
172        qmsg, qdata = self._sb.dequeue(files[0])
173        # Check the .db file
174        eq(qdata.get('listname'), '_xtest')
175        eq(qdata.get('recips'), ['aperson@dom.ain'])
176        eq(qdata.get('version'), 3)
177        # Check the .pck
178        eq(str(qmsg['subject']), '_xtest post acknowledgement')
179        eq(qmsg['to'], 'aperson@dom.ain')
180        eq(qmsg['from'], '_xtest-bounces@dom.ain')
181        eq(qmsg.get_content_type(), 'text/plain')
182        eq(qmsg.get_param('charset'), 'us-ascii')
183        msgid = qmsg['message-id']
184        self.failUnless(msgid.startswith('<mailman.'))
185        self.failUnless(msgid.endswith('._xtest@dom.ain>'))
186        eq(qmsg.get_payload(), """\
187Your message entitled
188
189    Wish you were here
190
191was successfully received by the _xtest mailing list.
192
193List info page: http://www.dom.ain/mailman/listinfo/_xtest
194Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
195""")
196        # Make sure we dequeued the only message
197        eq(len(self._sb.files()), 0)
198
199
200
201class TestAfterDelivery(TestBase):
202    # Both msg and msgdata are ignored
203    def test_process(self):
204        mlist = self._mlist
205        last_post_time = mlist.last_post_time
206        post_id = mlist.post_id
207        AfterDelivery.process(mlist, None, None)
208        self.failUnless(mlist.last_post_time > last_post_time)
209        self.assertEqual(mlist.post_id, post_id + 1)
210
211
212
213class TestApprove(TestBase):
214    def test_short_circuit(self):
215        msgdata = {'approved': 1}
216        rtn = Approve.process(self._mlist, Message.Message(), msgdata)
217        # Not really a great test, but there's little else to assert
218        self.assertEqual(rtn, None)
219
220    def test_approved_moderator(self):
221        mlist = self._mlist
222        mlist.mod_password = password('wazoo')
223        msg = email.message_from_string("""\
224Approved: wazoo
225
226""")
227        msgdata = {}
228        Approve.process(mlist, msg, msgdata)
229        self.failUnless(msgdata.has_key('approved'))
230        self.assertEqual(msgdata['approved'], 1)
231
232    def test_approve_moderator(self):
233        mlist = self._mlist
234        mlist.mod_password = password('wazoo')
235        msg = email.message_from_string("""\
236Approve: wazoo
237
238""")
239        msgdata = {}
240        Approve.process(mlist, msg, msgdata)
241        self.failUnless(msgdata.has_key('approved'))
242        self.assertEqual(msgdata['approved'], 1)
243
244    def test_approved_admin(self):
245        mlist = self._mlist
246        mlist.password = password('wazoo')
247        msg = email.message_from_string("""\
248Approved: wazoo
249
250""")
251        msgdata = {}
252        Approve.process(mlist, msg, msgdata)
253        self.failUnless(msgdata.has_key('approved'))
254        self.assertEqual(msgdata['approved'], 1)
255
256    def test_approve_admin(self):
257        mlist = self._mlist
258        mlist.password = password('wazoo')
259        msg = email.message_from_string("""\
260Approve: wazoo
261
262""")
263        msgdata = {}
264        Approve.process(mlist, msg, msgdata)
265        self.failUnless(msgdata.has_key('approved'))
266        self.assertEqual(msgdata['approved'], 1)
267
268    def test_unapproved(self):
269        mlist = self._mlist
270        mlist.password = password('zoowa')
271        msg = email.message_from_string("""\
272Approve: wazoo
273
274""")
275        msgdata = {}
276        Approve.process(mlist, msg, msgdata)
277        self.assertEqual(msgdata.get('approved'), None)
278
279    def test_trip_beentheres(self):
280        mlist = self._mlist
281        msg = email.message_from_string("""\
282X-BeenThere: %s
283
284""" % mlist.GetListEmail())
285        self.assertRaises(Errors.LoopError, Approve.process, mlist, msg, {})
286
287
288
289class TestCalcRecips(TestBase):
290    def setUp(self):
291        TestBase.setUp(self)
292        # Add a bunch of regular members
293        mlist = self._mlist
294        mlist.addNewMember('aperson@dom.ain')
295        mlist.addNewMember('bperson@dom.ain')
296        mlist.addNewMember('cperson@dom.ain')
297        # And a bunch of digest members
298        mlist.addNewMember('dperson@dom.ain', digest=1)
299        mlist.addNewMember('eperson@dom.ain', digest=1)
300        mlist.addNewMember('fperson@dom.ain', digest=1)
301
302    def test_short_circuit(self):
303        msgdata = {'recips': 1}
304        rtn = CalcRecips.process(self._mlist, None, msgdata)
305        # Not really a great test, but there's little else to assert
306        self.assertEqual(rtn, None)
307
308    def test_simple_path(self):
309        msgdata = {}
310        msg = email.message_from_string("""\
311From: dperson@dom.ain
312
313""", Message.Message)
314        CalcRecips.process(self._mlist, msg, msgdata)
315        self.failUnless(msgdata.has_key('recips'))
316        recips = msgdata['recips']
317        recips.sort()
318        self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
319                                  'cperson@dom.ain'])
320
321    def test_exclude_sender(self):
322        msgdata = {}
323        msg = email.message_from_string("""\
324From: cperson@dom.ain
325
326""", Message.Message)
327        self._mlist.setMemberOption('cperson@dom.ain',
328                                    mm_cfg.DontReceiveOwnPosts, 1)
329        CalcRecips.process(self._mlist, msg, msgdata)
330        self.failUnless(msgdata.has_key('recips'))
331        recips = msgdata['recips']
332        recips.sort()
333        self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain'])
334
335    def test_urgent_moderator(self):
336        self._mlist.mod_password = password('xxXXxx')
337        msgdata = {}
338        msg = email.message_from_string("""\
339From: dperson@dom.ain
340Urgent: xxXXxx
341
342""", Message.Message)
343        CalcRecips.process(self._mlist, msg, msgdata)
344        self.failUnless(msgdata.has_key('recips'))
345        recips = msgdata['recips']
346        recips.sort()
347        self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
348                                  'cperson@dom.ain', 'dperson@dom.ain',
349                                  'eperson@dom.ain', 'fperson@dom.ain'])
350
351    def test_urgent_admin(self):
352        self._mlist.mod_password = password('yyYYyy')
353        self._mlist.password = password('xxXXxx')
354        msgdata = {}
355        msg = email.message_from_string("""\
356From: dperson@dom.ain
357Urgent: xxXXxx
358
359""", Message.Message)
360        CalcRecips.process(self._mlist, msg, msgdata)
361        self.failUnless(msgdata.has_key('recips'))
362        recips = msgdata['recips']
363        recips.sort()
364        self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
365                                  'cperson@dom.ain', 'dperson@dom.ain',
366                                  'eperson@dom.ain', 'fperson@dom.ain'])
367
368    def test_urgent_reject(self):
369        self._mlist.mod_password = password('yyYYyy')
370        self._mlist.password = password('xxXXxx')
371        msgdata = {}
372        msg = email.message_from_string("""\
373From: dperson@dom.ain
374Urgent: zzZZzz
375
376""", Message.Message)
377        self.assertRaises(Errors.RejectMessage,
378                          CalcRecips.process,
379                          self._mlist, msg, msgdata)
380
381    # BAW: must test the do_topic_filters() path...
382
383
384
385class TestCleanse(TestBase):
386    def setUp(self):
387        TestBase.setUp(self)
388
389    def test_simple_cleanse(self):
390        eq = self.assertEqual
391        msg = email.message_from_string("""\
392From: aperson@dom.ain
393Approved: yes
394Urgent: indeed
395Reply-To: bperson@dom.ain
396Sender: asystem@dom.ain
397Return-Receipt-To: another@dom.ain
398Disposition-Notification-To: athird@dom.ain
399X-Confirm-Reading-To: afourth@dom.ain
400X-PMRQC: afifth@dom.ain
401Subject: a message to you
402
403""", Message.Message)
404        Cleanse.process(self._mlist, msg, {})
405        eq(msg['approved'], None)
406        eq(msg['urgent'], None)
407        eq(msg['return-receipt-to'], None)
408        eq(msg['disposition-notification-to'], None)
409        eq(msg['x-confirm-reading-to'], None)
410        eq(msg['x-pmrqc'], None)
411        eq(msg['from'], 'aperson@dom.ain')
412        eq(msg['reply-to'], 'bperson@dom.ain')
413        eq(msg['sender'], 'asystem@dom.ain')
414        eq(msg['subject'], 'a message to you')
415
416    def test_anon_cleanse(self):
417        eq = self.assertEqual
418        msg = email.message_from_string("""\
419From: aperson@dom.ain
420Approved: yes
421Urgent: indeed
422Reply-To: bperson@dom.ain
423Sender: asystem@dom.ain
424Return-Receipt-To: another@dom.ain
425Disposition-Notification-To: athird@dom.ain
426X-Confirm-Reading-To: afourth@dom.ain
427X-PMRQC: afifth@dom.ain
428Subject: a message to you
429
430""", Message.Message)
431        self._mlist.anonymous_list = 1
432        Cleanse.process(self._mlist, msg, {})
433        eq(msg['approved'], None)
434        eq(msg['urgent'], None)
435        eq(msg['return-receipt-to'], None)
436        eq(msg['disposition-notification-to'], None)
437        eq(msg['x-confirm-reading-to'], None)
438        eq(msg['x-pmrqc'], None)
439        eq(len(msg.get_all('from')), 1)
440        eq(len(msg.get_all('reply-to')), 1)
441        eq(msg['from'], '_xtest@dom.ain')
442        eq(msg['reply-to'], '_xtest@dom.ain')
443        eq(msg['sender'], None)
444        eq(msg['subject'], 'a message to you')
445
446
447
448class TestCookHeaders(TestBase):
449    def test_transform_noack_to_xack(self):
450        eq = self.assertEqual
451        msg = email.message_from_string("""\
452X-Ack: yes
453
454""", Message.Message)
455        CookHeaders.process(self._mlist, msg, {'noack': 1})
456        eq(len(msg.get_all('x-ack')), 1)
457        eq(msg['x-ack'], 'no')
458
459    def test_original_sender(self):
460        msg = email.message_from_string("""\
461From: aperson@dom.ain
462
463""", Message.Message)
464        msgdata = {}
465        CookHeaders.process(self._mlist, msg, msgdata)
466        self.assertEqual(msgdata.get('original_sender'), 'aperson@dom.ain')
467
468    def test_no_original_sender(self):
469        msg = email.message_from_string("""\
470Subject: about this message
471
472""", Message.Message)
473        msgdata = {}
474        CookHeaders.process(self._mlist, msg, msgdata)
475        self.assertEqual(msgdata.get('original_sender'), '')
476
477    def test_xbeenthere(self):
478        msg = email.message_from_string("""\
479From: aperson@dom.ain
480
481""", Message.Message)
482        CookHeaders.process(self._mlist, msg, {})
483        self.assertEqual(msg['x-beenthere'], '_xtest@dom.ain')
484
485    def test_multiple_xbeentheres(self):
486        eq = self.assertEqual
487        msg = email.message_from_string("""\
488From: aperson@dom.ain
489X-BeenThere: alist@another.dom.ain
490
491""", Message.Message)
492        CookHeaders.process(self._mlist, msg, {})
493        eq(len(msg.get_all('x-beenthere')), 2)
494        beentheres = msg.get_all('x-beenthere')
495        beentheres.sort()
496        eq(beentheres, ['_xtest@dom.ain', 'alist@another.dom.ain'])
497
498    def test_nonexisting_mmversion(self):
499        eq = self.assertEqual
500        msg = email.message_from_string("""\
501From: aperson@dom.ain
502
503""", Message.Message)
504        CookHeaders.process(self._mlist, msg, {})
505        eq(msg['x-mailman-version'], mm_cfg.VERSION)
506
507    def test_existing_mmversion(self):
508        eq = self.assertEqual
509        msg = email.message_from_string("""\
510From: aperson@dom.ain
511X-Mailman-Version: 3000
512
513""", Message.Message)
514        CookHeaders.process(self._mlist, msg, {})
515        eq(len(msg.get_all('x-mailman-version')), 1)
516        eq(msg['x-mailman-version'], '3000')
517
518    def test_nonexisting_precedence(self):
519        eq = self.assertEqual
520        msg = email.message_from_string("""\
521From: aperson@dom.ain
522
523""", Message.Message)
524        CookHeaders.process(self._mlist, msg, {})
525        eq(msg['precedence'], 'list')
526
527    def test_existing_precedence(self):
528        eq = self.assertEqual
529        msg = email.message_from_string("""\
530From: aperson@dom.ain
531Precedence: junk
532
533""", Message.Message)
534        CookHeaders.process(self._mlist, msg, {})
535        eq(len(msg.get_all('precedence')), 1)
536        eq(msg['precedence'], 'junk')
537
538    def test_subject_munging_no_subject(self):
539        self._mlist.subject_prefix = '[XTEST] '
540        msg = email.message_from_string("""\
541From: aperson@dom.ain
542
543""", Message.Message)
544        msgdata = {}
545        CookHeaders.process(self._mlist, msg, msgdata)
546        self.assertEqual(msgdata.get('origsubj'), '')
547        self.assertEqual(str(msg['subject']), '[XTEST] (no subject)')
548
549    def test_subject_munging(self):
550        self._mlist.subject_prefix = '[XTEST] '
551        msg = email.message_from_string("""\
552From: aperson@dom.ain
553Subject: About Mailman...
554
555""", Message.Message)
556        CookHeaders.process(self._mlist, msg, {})
557        self.assertEqual(str(msg['subject']), '[XTEST] About Mailman...')
558
559    def test_no_subject_munging_for_digests(self):
560        self._mlist.subject_prefix = '[XTEST] '
561        msg = email.message_from_string("""\
562From: aperson@dom.ain
563Subject: About Mailman...
564
565""", Message.Message)
566        CookHeaders.process(self._mlist, msg, {'isdigest': 1})
567        self.assertEqual(msg['subject'], 'About Mailman...')
568
569    def test_no_subject_munging_for_fasttrack(self):
570        self._mlist.subject_prefix = '[XTEST] '
571        msg = email.message_from_string("""\
572From: aperson@dom.ain
573Subject: About Mailman...
574
575""", Message.Message)
576        CookHeaders.process(self._mlist, msg, {'_fasttrack': 1})
577        self.assertEqual(msg['subject'], 'About Mailman...')
578
579    def test_no_subject_munging_has_prefix(self):
580        self._mlist.subject_prefix = '[XTEST] '
581        msg = email.message_from_string("""\
582From: aperson@dom.ain
583Subject: Re: [XTEST] About Mailman...
584
585""", Message.Message)
586        CookHeaders.process(self._mlist, msg, {})
587        # prefixing depends on mm_cfg.py
588        self.failUnless(str(msg['subject']) == 'Re: [XTEST] About Mailman...' or
589                        str(msg['subject']) == '[XTEST] Re: About Mailman...')
590
591    def test_reply_to_list(self):
592        eq = self.assertEqual
593        mlist = self._mlist
594        mlist.reply_goes_to_list = 1
595        mlist.from_is_list = 0
596        msg = email.message_from_string("""\
597From: aperson@dom.ain
598
599""", Message.Message)
600        msgdata = {}
601        CookHeaders.process(mlist, msg, msgdata)
602        eq(msgdata['add_header']['Reply-To'], '_xtest@dom.ain')
603        eq(msg.get_all('reply-to'), None)
604
605    def test_reply_to_list_fil(self):
606        eq = self.assertEqual
607        mlist = self._mlist
608        mlist.reply_goes_to_list = 1
609        mlist.from_is_list = 1
610        msg = email.message_from_string("""\
611From: aperson@dom.ain
612
613""", Message.Message)
614        msgdata = {}
615        CookHeaders.process(mlist, msg, msgdata)
616        eq(msgdata['add_header']['Reply-To'],
617            '_xtest@dom.ain')
618        eq(msgdata['add_header']['Cc'],
619            'aperson@dom.ain')
620        eq(msg.get_all('reply-to'), None)
621        eq(msg.get_all('cc'), None)
622
623    def test_reply_to_list_with_strip(self):
624        eq = self.assertEqual
625        mlist = self._mlist
626        mlist.reply_goes_to_list = 1
627        mlist.first_strip_reply_to = 1
628        mlist.from_is_list = 0
629        msg = email.message_from_string("""\
630From: aperson@dom.ain
631Reply-To: bperson@dom.ain
632
633""", Message.Message)
634        msgdata = {}
635        CookHeaders.process(mlist, msg, msgdata)
636        eq(msgdata['add_header']['Reply-To'], '_xtest@dom.ain')
637        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
638
639    def test_reply_to_list_with_strip_fil(self):
640        eq = self.assertEqual
641        mlist = self._mlist
642        mlist.reply_goes_to_list = 1
643        mlist.first_strip_reply_to = 1
644        mlist.from_is_list = 1
645        msg = email.message_from_string("""\
646From: aperson@dom.ain
647Reply-To: bperson@dom.ain
648
649""", Message.Message)
650        msgdata = {}
651        CookHeaders.process(mlist, msg, msgdata)
652        eq(msgdata['add_header']['Reply-To'],
653            '_xtest@dom.ain')
654        eq(msgdata['add_header']['Cc'],
655            'aperson@dom.ain')
656        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
657        eq(msg.get_all('cc'), None)
658
659
660    def test_reply_to_explicit(self):
661        eq = self.assertEqual
662        mlist = self._mlist
663        mlist.reply_goes_to_list = 2
664        mlist.from_is_list = 0
665        mlist.reply_to_address = 'mlist@dom.ain'
666        msg = email.message_from_string("""\
667From: aperson@dom.ain
668
669""", Message.Message)
670        msgdata = {}
671        CookHeaders.process(mlist, msg, msgdata)
672        eq(msgdata['add_header']['Reply-To'], 'mlist@dom.ain')
673        eq(msg.get_all('reply-to'), None)
674
675    def test_reply_to_explicit_fil(self):
676        eq = self.assertEqual
677        mlist = self._mlist
678        mlist.reply_goes_to_list = 2
679        mlist.from_is_list = 1
680        mlist.reply_to_address = 'mlist@dom.ain'
681        msg = email.message_from_string("""\
682From: aperson@dom.ain
683
684""", Message.Message)
685        msgdata = {}
686        CookHeaders.process(mlist, msg, msgdata)
687        eq(msgdata['add_header']['Reply-To'],
688            'mlist@dom.ain')
689        eq(msgdata['add_header']['Cc'],
690            'aperson@dom.ain')
691        eq(msg.get_all('reply-to'), None)
692        eq(msg.get_all('cc'), None)
693
694    def test_reply_to_explicit_with_strip(self):
695        eq = self.assertEqual
696        mlist = self._mlist
697        mlist.reply_goes_to_list = 2
698        mlist.first_strip_reply_to = 1
699        mlist.from_is_list = 0
700        mlist.reply_to_address = 'mlist@dom.ain'
701        msg = email.message_from_string("""\
702From: aperson@dom.ain
703Reply-To: bperson@dom.ain
704
705""", Message.Message)
706        msgdata = {}
707
708        CookHeaders.process(self._mlist, msg, msgdata)
709        eq(msgdata['add_header']['Reply-To'], 'mlist@dom.ain')
710        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
711
712    def test_reply_to_explicit_with_strip_fil(self):
713        eq = self.assertEqual
714        mlist = self._mlist
715        mlist.reply_goes_to_list = 2
716        mlist.first_strip_reply_to = 1
717        mlist.from_is_list = 1
718        mlist.reply_to_address = 'mlist@dom.ain'
719        msg = email.message_from_string("""\
720From: aperson@dom.ain
721Reply-To: bperson@dom.ain
722
723""", Message.Message)
724        msgdata = {}
725
726        CookHeaders.process(self._mlist, msg, msgdata)
727        eq(msgdata['add_header']['Reply-To'],
728            'mlist@dom.ain')
729        eq(msgdata['add_header']['Cc'],
730            'aperson@dom.ain')
731        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
732        eq(msg.get_all('cc'), None)
733
734    def test_reply_to_extends_to_list(self):
735        eq = self.assertEqual
736        mlist = self._mlist
737        mlist.reply_goes_to_list = 1
738        mlist.first_strip_reply_to = 0
739        mlist.from_is_list = 0
740        msg = email.message_from_string("""\
741From: aperson@dom.ain
742Reply-To: bperson@dom.ain
743
744""", Message.Message)
745        msgdata = {}
746
747        CookHeaders.process(mlist, msg, msgdata)
748        eq(msgdata['add_header']['Reply-To'],
749            'bperson@dom.ain, _xtest@dom.ain')
750
751    def test_reply_to_extends_to_list_fil(self):
752        eq = self.assertEqual
753        mlist = self._mlist
754        mlist.reply_goes_to_list = 1
755        mlist.first_strip_reply_to = 0
756        mlist.from_is_list = 1
757        msg = email.message_from_string("""\
758From: aperson@dom.ain
759Reply-To: bperson@dom.ain
760
761""", Message.Message)
762        msgdata = {}
763
764        CookHeaders.process(mlist, msg, msgdata)
765        eq(msgdata['add_header']['Reply-To'],
766            'bperson@dom.ain, _xtest@dom.ain')
767        eq(msgdata['add_header']['Cc'],
768            'aperson@dom.ain')
769        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
770        eq(msg.get_all('cc'), None)
771
772    def test_reply_to_extends_to_explicit(self):
773        eq = self.assertEqual
774        mlist = self._mlist
775        mlist.reply_goes_to_list = 2
776        mlist.first_strip_reply_to = 0
777        mlist.from_is_list = 0
778        mlist.reply_to_address = 'mlist@dom.ain'
779        msg = email.message_from_string("""\
780From: aperson@dom.ain
781Reply-To: bperson@dom.ain
782
783""", Message.Message)
784        msgdata = {}
785        CookHeaders.process(mlist, msg, msgdata)
786        eq(msgdata['add_header']['Reply-To'],
787            'mlist@dom.ain, bperson@dom.ain')
788
789    def test_reply_to_extends_to_explicit_fil(self):
790        eq = self.assertEqual
791        mlist = self._mlist
792        mlist.reply_goes_to_list = 2
793        mlist.first_strip_reply_to = 0
794        mlist.from_is_list = 1
795        mlist.reply_to_address = 'mlist@dom.ain'
796        msg = email.message_from_string("""\
797From: aperson@dom.ain
798Reply-To: bperson@dom.ain
799
800""", Message.Message)
801        msgdata = {}
802        CookHeaders.process(mlist, msg, msgdata)
803        eq(msgdata['add_header']['Reply-To'],
804            'mlist@dom.ain, bperson@dom.ain')
805        eq(msgdata['add_header']['Cc'],
806            'aperson@dom.ain')
807        eq(msg.get_all('reply-to'), ['bperson@dom.ain'])
808        eq(msg.get_all('cc'), None)
809
810    def test_list_headers_nolist(self):
811        eq = self.assertEqual
812        msg = email.message_from_string("""\
813From: aperson@dom.ain
814
815""", Message.Message)
816        CookHeaders.process(self._mlist, msg, {'_nolist': 1})
817        eq(msg['list-id'], None)
818        eq(msg['list-help'], None)
819        eq(msg['list-unsubscribe'], None)
820        eq(msg['list-subscribe'], None)
821        eq(msg['list-post'], None)
822        eq(msg['list-archive'], None)
823
824    def test_list_headers(self):
825        eq = self.assertEqual
826        self._mlist.archive = 1
827        msg = email.message_from_string("""\
828From: aperson@dom.ain
829
830""", Message.Message)
831        oldval = mm_cfg.DEFAULT_URL_HOST
832        mm_cfg.DEFAULT_URL_HOST = 'www.dom.ain'
833        try:
834            CookHeaders.process(self._mlist, msg, {})
835        finally:
836            mm_cfg.DEFAULT_URL_HOST = oldval
837        eq(msg['list-id'], '<_xtest.dom.ain>')
838        eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>')
839        eq(msg['list-unsubscribe'],
840           '<http://www.dom.ain/mailman/options/_xtest>,'
841           '\n <mailto:_xtest-request@dom.ain?subject=unsubscribe>')
842        eq(msg['list-subscribe'],
843           '<http://www.dom.ain/mailman/listinfo/_xtest>,'
844           '\n <mailto:_xtest-request@dom.ain?subject=subscribe>')
845        eq(msg['list-post'], '<mailto:_xtest@dom.ain>')
846        eq(msg['list-archive'], '<http://www.dom.ain/pipermail/_xtest/>')
847
848    def test_list_headers_with_description(self):
849        eq = self.assertEqual
850        self._mlist.archive = 1
851        self._mlist.description = 'A Test List'
852        msg = email.message_from_string("""\
853From: aperson@dom.ain
854
855""", Message.Message)
856        CookHeaders.process(self._mlist, msg, {})
857        eq(unicode(msg['list-id']), 'A Test List <_xtest.dom.ain>')
858        eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>')
859        eq(msg['list-unsubscribe'],
860           '<http://www.dom.ain/mailman/options/_xtest>,'
861           '\n <mailto:_xtest-request@dom.ain?subject=unsubscribe>')
862        eq(msg['list-subscribe'],
863           '<http://www.dom.ain/mailman/listinfo/_xtest>,'
864           '\n <mailto:_xtest-request@dom.ain?subject=subscribe>')
865        eq(msg['list-post'], '<mailto:_xtest@dom.ain>')
866
867
868
869class TestDecorate(TestBase):
870    def test_short_circuit(self):
871        msgdata = {'isdigest': 1}
872        rtn = Decorate.process(self._mlist, None, msgdata)
873        # Not really a great test, but there's little else to assert
874        self.assertEqual(rtn, None)
875
876    def test_no_multipart(self):
877        mlist = self._mlist
878        mlist.msg_header = 'header\n'
879        mlist.msg_footer = 'footer'
880        msg = email.message_from_string("""\
881From: aperson@dom.ain
882
883Here is a message.
884""")
885        Decorate.process(self._mlist, msg, {})
886        self.assertEqual(msg.get_payload(), """\
887header
888Here is a message.
889footer
890""")
891
892    def test_no_multipart_template(self):
893        mlist = self._mlist
894        mlist.msg_header = '%(real_name)s header\n'
895        mlist.msg_footer = '%(real_name)s footer'
896        mlist.real_name = 'XTest'
897        msg = email.message_from_string("""\
898From: aperson@dom.ain
899
900Here is a message.
901""")
902        Decorate.process(self._mlist, msg, {})
903        self.assertEqual(msg.get_payload(), """\
904XTest header
905Here is a message.
906XTest footer
907""")
908
909    def test_no_multipart_type_error(self):
910        mlist = self._mlist
911        mlist.msg_header = '%(real_name) header\n'
912        mlist.msg_footer = '%(real_name) footer'
913        mlist.real_name = 'XTest'
914        msg = email.message_from_string("""\
915From: aperson@dom.ain
916
917Here is a message.
918""")
919        Decorate.process(self._mlist, msg, {})
920        self.assertEqual(msg.get_payload(), """\
921%(real_name) header
922Here is a message.
923%(real_name) footer
924""")
925
926    def test_no_multipart_value_error(self):
927        mlist = self._mlist
928        # These will generate warnings in logs/error
929        mlist.msg_header = '%(real_name)p header\n'
930        mlist.msg_footer = '%(real_name)p footer'
931        mlist.real_name = 'XTest'
932        msg = email.message_from_string("""\
933From: aperson@dom.ain
934
935Here is a message.
936""")
937        Decorate.process(self._mlist, msg, {})
938        self.assertEqual(msg.get_payload(), """\
939%(real_name)p header
940Here is a message.
941%(real_name)p footer
942""")
943
944    def test_no_multipart_missing_key(self):
945        mlist = self._mlist
946        mlist.msg_header = '%(spooge)s header\n'
947        mlist.msg_footer = '%(spooge)s footer'
948        msg = email.message_from_string("""\
949From: aperson@dom.ain
950
951Here is a message.
952""")
953        Decorate.process(self._mlist, msg, {})
954        self.assertEqual(msg.get_payload(), """\
955%(spooge)s header
956Here is a message.
957%(spooge)s footer
958""")
959
960    def test_multipart(self):
961        eq = self.ndiffAssertEqual
962        mlist = self._mlist
963        mlist.msg_header = 'header'
964        mlist.msg_footer = 'footer'
965        msg1 = email.message_from_string("""\
966From: aperson@dom.ain
967
968Here is the first message.
969""")
970        msg2 = email.message_from_string("""\
971From: bperson@dom.ain
972
973Here is the second message.
974""")
975        msg = Message.Message()
976        msg.set_type('multipart/mixed')
977        msg.set_boundary('BOUNDARY')
978        msg.attach(msg1)
979        msg.attach(msg2)
980        Decorate.process(self._mlist, msg, {})
981        eq(msg.as_string(unixfrom=0), """\
982MIME-Version: 1.0
983Content-Type: multipart/mixed; boundary="BOUNDARY"
984
985--BOUNDARY
986Content-Type: text/plain; charset="us-ascii"
987MIME-Version: 1.0
988Content-Transfer-Encoding: 7bit
989Content-Disposition: inline
990
991header
992
993--BOUNDARY
994From: aperson@dom.ain
995
996Here is the first message.
997
998--BOUNDARY
999From: bperson@dom.ain
1000
1001Here is the second message.
1002
1003--BOUNDARY
1004Content-Type: text/plain; charset="us-ascii"
1005MIME-Version: 1.0
1006Content-Transfer-Encoding: 7bit
1007Content-Disposition: inline
1008
1009footer
1010
1011--BOUNDARY--
1012""")
1013
1014    def test_image(self):
1015        eq = self.assertEqual
1016        mlist = self._mlist
1017        mlist.msg_header = 'header\n'
1018        mlist.msg_footer = 'footer'
1019        msg = email.message_from_string("""\
1020From: aperson@dom.ain
1021Content-type: image/x-spooge
1022
1023IMAGEDATAIMAGEDATAIMAGEDATA
1024""")
1025        Decorate.process(self._mlist, msg, {})
1026        eq(len(msg.get_payload()), 3)
1027        self.assertEqual(msg.get_payload(1).get_payload(), """\
1028IMAGEDATAIMAGEDATAIMAGEDATA
1029""")
1030
1031    def test_personalize_assert(self):
1032        raises = self.assertRaises
1033        raises(AssertionError, Decorate.process,
1034               self._mlist, None, {'personalize': 1})
1035        raises(AssertionError, Decorate.process,
1036               self._mlist, None, {'personalize': 1,
1037                                   'recips': [1, 2, 3]})
1038
1039
1040
1041class TestFileRecips(TestBase):
1042    def test_short_circuit(self):
1043        msgdata = {'recips': 1}
1044        rtn = FileRecips.process(self._mlist, None, msgdata)
1045        # Not really a great test, but there's little else to assert
1046        self.assertEqual(rtn, None)
1047
1048    def test_file_nonexistant(self):
1049        msgdata = {}
1050        FileRecips.process(self._mlist, None, msgdata)
1051        self.assertEqual(msgdata.get('recips'), [])
1052
1053    def test_file_exists_no_sender(self):
1054        msg = email.message_from_string("""\
1055To: yall@dom.ain
1056
1057""", Message.Message)
1058        msgdata = {}
1059        file = os.path.join(self._mlist.fullpath(), 'members.txt')
1060        addrs = ['aperson@dom.ain', 'bperson@dom.ain',
1061                 'cperson@dom.ain', 'dperson@dom.ain']
1062        fp = open(file, 'w')
1063        try:
1064            for addr in addrs:
1065                print >> fp, addr
1066            fp.close()
1067            FileRecips.process(self._mlist, msg, msgdata)
1068            self.assertEqual(msgdata.get('recips'), addrs)
1069        finally:
1070            try:
1071                os.unlink(file)
1072            except OSError, e:
1073                if e.errno <> e.ENOENT: raise
1074
1075    def test_file_exists_no_member(self):
1076        msg = email.message_from_string("""\
1077From: eperson@dom.ain
1078To: yall@dom.ain
1079
1080""", Message.Message)
1081        msgdata = {}
1082        file = os.path.join(self._mlist.fullpath(), 'members.txt')
1083        addrs = ['aperson@dom.ain', 'bperson@dom.ain',
1084                 'cperson@dom.ain', 'dperson@dom.ain']
1085        fp = open(file, 'w')
1086        try:
1087            for addr in addrs:
1088                print >> fp, addr
1089            fp.close()
1090            FileRecips.process(self._mlist, msg, msgdata)
1091            self.assertEqual(msgdata.get('recips'), addrs)
1092        finally:
1093            try:
1094                os.unlink(file)
1095            except OSError, e:
1096                if e.errno <> e.ENOENT: raise
1097
1098    def test_file_exists_is_member(self):
1099        msg = email.message_from_string("""\
1100From: aperson@dom.ain
1101To: yall@dom.ain
1102
1103""", Message.Message)
1104        msgdata = {}
1105        file = os.path.join(self._mlist.fullpath(), 'members.txt')
1106        addrs = ['aperson@dom.ain', 'bperson@dom.ain',
1107                 'cperson@dom.ain', 'dperson@dom.ain']
1108        fp = open(file, 'w')
1109        try:
1110            for addr in addrs:
1111                print >> fp, addr
1112                self._mlist.addNewMember(addr)
1113            fp.close()
1114            FileRecips.process(self._mlist, msg, msgdata)
1115            self.assertEqual(msgdata.get('recips'), addrs[1:])
1116        finally:
1117            try:
1118                os.unlink(file)
1119            except OSError, e:
1120                if e.errno <> e.ENOENT: raise
1121
1122
1123
1124class TestHold(TestBase):
1125    def setUp(self):
1126        TestBase.setUp(self)
1127        self._mlist.administrivia = 1
1128        self._mlist.respond_to_post_requests = 0
1129        self._mlist.admin_immed_notify = 0
1130        # We're going to want to inspect this queue directory
1131        self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR)
1132
1133    def tearDown(self):
1134        for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR):
1135            os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f))
1136        TestBase.tearDown(self)
1137        try:
1138            os.unlink(os.path.join(mm_cfg.DATA_DIR, 'pending.db'))
1139        except OSError, e:
1140            if e.errno <> errno.ENOENT: raise
1141        for f in [holdfile for holdfile in os.listdir(mm_cfg.DATA_DIR)
1142                  if holdfile.startswith('heldmsg-')]:
1143            os.unlink(os.path.join(mm_cfg.DATA_DIR, f))
1144
1145    def test_short_circuit(self):
1146        msgdata = {'approved': 1}
1147        rtn = Hold.process(self._mlist, None, msgdata)
1148        # Not really a great test, but there's little else to assert
1149        self.assertEqual(rtn, None)
1150
1151    def test_administrivia(self):
1152        msg = email.message_from_string("""\
1153From: aperson@dom.ain
1154Subject: unsubscribe
1155
1156""", Message.Message)
1157        self.assertRaises(Hold.Administrivia, Hold.process,
1158                          self._mlist, msg, {})
1159
1160    def test_max_recips(self):
1161        self._mlist.max_num_recipients = 5
1162        msg = email.message_from_string("""\
1163From: aperson@dom.ain
1164To: _xtest@dom.ain, bperson@dom.ain
1165Cc: cperson@dom.ain
1166Cc: dperson@dom.ain (Jimmy D. Person)
1167To: Billy E. Person <eperson@dom.ain>
1168
1169Hey folks!
1170""", Message.Message)
1171        self.assertRaises(Hold.TooManyRecipients, Hold.process,
1172                          self._mlist, msg, {})
1173
1174    def test_implicit_destination(self):
1175        self._mlist.require_explicit_destination = 1
1176        msg = email.message_from_string("""\
1177From: aperson@dom.ain
1178Subject: An implicit message
1179
1180""", Message.Message)
1181        self.assertRaises(Hold.ImplicitDestination, Hold.process,
1182                          self._mlist, msg, {})
1183
1184    def test_implicit_destination_fromusenet(self):
1185        self._mlist.require_explicit_destination = 1
1186        msg = email.message_from_string("""\
1187From: aperson@dom.ain
1188Subject: An implicit message
1189
1190""", Message.Message)
1191        rtn = Hold.process(self._mlist, msg, {'fromusenet': 1})
1192        self.assertEqual(rtn, None)
1193
1194    def test_suspicious_header(self):
1195        self._mlist.bounce_matching_headers = 'From: .*person@(blah.)?dom.ain'
1196        msg = email.message_from_string("""\
1197From: aperson@dom.ain
1198To: _xtest@dom.ain
1199Subject: An implicit message
1200
1201""", Message.Message)
1202        self.assertRaises(Hold.SuspiciousHeaders, Hold.process,
1203                          self._mlist, msg, {})
1204
1205    def test_suspicious_header_ok(self):
1206        self._mlist.bounce_matching_headers = 'From: .*person@blah.dom.ain'
1207        msg = email.message_from_string("""\
1208From: aperson@dom.ain
1209To: _xtest@dom.ain
1210Subject: An implicit message
1211
1212""", Message.Message)
1213        rtn = Hold.process(self._mlist, msg, {})
1214        self.assertEqual(rtn, None)
1215
1216    def test_max_message_size(self):
1217        self._mlist.max_message_size = 1
1218        msg = email.message_from_string("""\
1219From: aperson@dom.ain
1220To: _xtest@dom.ain
1221
1222xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1223xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1224xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1225xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1226xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1227xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1228xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1229xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1230xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1231xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1232xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1233xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1234xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1235xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
1236""", Message.Message)
1237        self.assertRaises(Hold.MessageTooBig, Hold.process,
1238                          self._mlist, msg, {})
1239
1240    def test_hold_notifications(self):
1241        eq = self.assertEqual
1242        self._mlist.respond_to_post_requests = 1
1243        self._mlist.admin_immed_notify = 1
1244        # Now cause an implicit destination hold
1245        msg = email.message_from_string("""\
1246From: aperson@dom.ain
1247
1248""", Message.Message)
1249        self.assertRaises(Hold.ImplicitDestination, Hold.process,
1250                          self._mlist, msg, {})
1251        # Now we have to make sure there are two messages in the virgin queue,
1252        # one to the sender and one to the list owners.
1253        qfiles = {}
1254        files = self._sb.files()
1255        eq(len(files), 2)
1256        for filebase in files:
1257            qmsg, qdata = self._sb.dequeue(filebase)
1258            to = qmsg['to']
1259            qfiles[to] = qmsg, qdata
1260        # BAW: We could be testing many other attributes of either the
1261        # messages or the metadata files...
1262        keys = qfiles.keys()
1263        keys.sort()
1264        eq(keys, ['_xtest-owner@dom.ain', 'aperson@dom.ain'])
1265        # Get the pending cookie from the message to the sender
1266        pmsg, pdata = qfiles['aperson@dom.ain']
1267        confirmlines = pmsg.get_payload().split('\n')
1268        cookie = confirmlines[-3].split('/')[-1]
1269        # We also need to make sure there's an entry in the Pending database
1270        # for the heold message.
1271        data = self._mlist.pend_confirm(cookie)
1272        eq(data, ('H', 1))
1273        heldmsg = os.path.join(mm_cfg.DATA_DIR, 'heldmsg-_xtest-1.pck')
1274        self.failUnless(os.path.exists(heldmsg))
1275        os.unlink(heldmsg)
1276        holdfiles = [f for f in os.listdir(mm_cfg.DATA_DIR)
1277                     if f.startswith('heldmsg-')]
1278        eq(len(holdfiles), 0)
1279
1280
1281
1282class TestMimeDel(TestBase):
1283    def setUp(self):
1284        TestBase.setUp(self)
1285        self._mlist.filter_content = 1
1286        self._mlist.filter_mime_types = ['image/jpeg']
1287        self._mlist.pass_mime_types = []
1288        self._mlist.convert_html_to_plaintext = 1
1289        self._mlist.collapse_alternatives = 1
1290
1291    def test_outer_matches(self):
1292        msg = email.message_from_string("""\
1293From: aperson@dom.ain
1294Content-Type: image/jpeg
1295MIME-Version: 1.0
1296
1297xxxxx
1298""")
1299        self.assertRaises(Errors.DiscardMessage, MimeDel.process,
1300                          self._mlist, msg, {})
1301
1302    def test_strain_multipart(self):
1303        eq = self.assertEqual
1304        msg = email.message_from_string("""\
1305From: aperson@dom.ain
1306Content-Type: multipart/mixed; boundary=BOUNDARY
1307MIME-Version: 1.0
1308
1309--BOUNDARY
1310Content-Type: image/jpeg
1311MIME-Version: 1.0
1312
1313xxx
1314
1315--BOUNDARY
1316Content-Type: image/gif
1317MIME-Version: 1.0
1318
1319yyy
1320--BOUNDARY--
1321""")
1322        MimeDel.process(self._mlist, msg, {})
1323        self.assertTrue(not msg.is_multipart())
1324        eq(msg.get_content_type(), 'image/gif')
1325        eq(msg.get_payload(), 'yyy')
1326
1327    def test_collapse_multipart_alternative(self):
1328        eq = self.assertEqual
1329        msg = email.message_from_string("""\
1330From: aperson@dom.ain
1331Content-Type: multipart/mixed; boundary=BOUNDARY
1332MIME-Version: 1.0
1333
1334--BOUNDARY
1335Content-Type: multipart/alternative; boundary=BOUND2
1336MIME-Version: 1.0
1337
1338--BOUND2
1339Content-Type: image/jpeg
1340MIME-Version: 1.0
1341
1342xxx
1343
1344--BOUND2
1345Content-Type: image/gif
1346MIME-Version: 1.0
1347
1348yyy
1349--BOUND2--
1350
1351--BOUNDARY--
1352""")
1353        MimeDel.process(self._mlist, msg, {})
1354        self.assertTrue(not msg.is_multipart())
1355        eq(msg.get_content_type(), 'image/gif')
1356        eq(msg.get_payload(), 'yyy')
1357
1358    def test_convert_to_plaintext(self):
1359        # BAW: This test is dependent on your particular lynx version
1360        eq = self.assertEqual
1361        msg = email.message_from_string("""\
1362From: aperson@dom.ain
1363Content-Type: text/html
1364MIME-Version: 1.0
1365
1366<html><head></head>
1367<body></body></html>
1368""")
1369        MimeDel.process(self._mlist, msg, {})
1370        eq(msg.get_content_type(), 'text/plain')
1371        #eq(msg.get_payload(), '\n\n\n')
1372        eq(msg.get_payload().strip(), '')
1373
1374    def test_deep_structure(self):
1375        eq = self.assertEqual
1376        self._mlist.filter_mime_types.append('text/html')
1377        msg = email.message_from_string("""\
1378From: aperson@dom.ain
1379Content-Type: multipart/mixed; boundary=AAA
1380
1381--AAA
1382Content-Type: multipart/mixed; boundary=BBB
1383
1384--BBB
1385Content-Type: image/jpeg
1386
1387xxx
1388--BBB
1389Content-Type: image/jpeg
1390
1391yyy
1392--BBB---
1393--AAA
1394Content-Type: multipart/alternative; boundary=CCC
1395
1396--CCC
1397Content-Type: text/html
1398
1399<h2>This is a header</h2>
1400
1401--CCC
1402Content-Type: text/plain
1403
1404A different message
1405--CCC--
1406--AAA
1407Content-Type: image/gif
1408
1409zzz
1410--AAA
1411Content-Type: image/gif
1412
1413aaa
1414--AAA--
1415""")
1416        MimeDel.process(self._mlist, msg, {})
1417        payload = msg.get_payload()
1418        eq(len(payload), 3)
1419        part1 = msg.get_payload(0)
1420        eq(part1.get_content_type(), 'text/plain')
1421        eq(part1.get_payload(), 'A different message')
1422        part2 = msg.get_payload(1)
1423        eq(part2.get_content_type(), 'image/gif')
1424        eq(part2.get_payload(), 'zzz')
1425        part3 = msg.get_payload(2)
1426        eq(part3.get_content_type(), 'image/gif')
1427        eq(part3.get_payload(), 'aaa')
1428
1429    def test_top_multipart_alternative(self):
1430        eq = self.assertEqual
1431        self._mlist.filter_mime_types.append('text/html')
1432        msg = email.message_from_string("""\
1433From: aperson@dom.ain
1434Content-Type: multipart/alternative; boundary=AAA
1435
1436--AAA
1437Content-Type: text/html
1438
1439<b>This is some html</b>
1440--AAA
1441Content-Type: text/plain
1442
1443This is plain text
1444--AAA--
1445""")
1446        MimeDel.process(self._mlist, msg, {})
1447        eq(msg.get_content_type(), 'text/plain')
1448        eq(msg.get_payload(), 'This is plain text')
1449
1450    def test_recast_multipart(self):
1451        eq = self.assertEqual
1452        self._mlist.filter_mime_types.append('application/pdf')
1453        msg = email.message_from_string("""\
1454From: aperson@dom.ain
1455MIME-Version: 1.0
1456Content-type: multipart/mixed;
1457 boundary="Boundary_0"
1458
1459--Boundary_0
1460Content-Type: multipart/mixed;
1461 boundary="Boundary_1"
1462
1463--Boundary_1
1464Content-type: multipart/mixed;
1465 boundary="Boundary_2"
1466
1467--Boundary_2
1468Content-type: multipart/alternative;
1469 boundary="Boundary_3"
1470
1471--Boundary_3
1472Content-type: text/plain; charset=us-ascii
1473Content-transfer-encoding: 7BIT
1474
1475Plain text part
1476--Boundary_3
1477Content-type: text/html; charset=us-ascii
1478Content-transfer-encoding: 7BIT
1479
1480HTML part
1481--Boundary_3--
1482
1483
1484--Boundary_2
1485Content-type: application/pdf
1486Content-transfer-encoding: 7BIT
1487
1488PDF part inner 2
1489--Boundary_2--
1490--Boundary_1
1491Content-type: text/plain; charset=us-ascii
1492Content-transfer-encoding: 7BIT
1493
1494second text
1495--Boundary_1--
1496
1497--Boundary_0
1498Content-Type: application/pdf
1499Content-transfer-encoding: 7BIT
1500
1501PDF part outer
1502--Boundary_0--
1503""")
1504        MimeDel.process(self._mlist, msg, {})
1505        payload = msg.get_payload()
1506        eq(len(payload), 2)
1507        part1 = msg.get_payload(0)
1508        eq(part1.get_content_type(), 'text/plain')
1509        eq(part1.get_payload(), 'Plain text part')
1510        part2 = msg.get_payload(1)
1511        eq(part2.get_content_type(), 'text/plain')
1512        eq(part2.get_payload(), 'second text')
1513
1514    def test_message_rfc822(self):
1515        eq = self.assertEqual
1516        msg = email.message_from_string("""\
1517Message-ID: <4D9E6AEA.1060802@example.net>
1518Date: Thu, 07 Apr 2011 18:54:50 -0700
1519From: User <user@example.com>
1520MIME-Version: 1.0
1521To: Someone <someone@example.net>
1522Subject: Message Subject
1523Content-Type: multipart/mixed;
1524 boundary="------------050603050603060608020908"
1525
1526This is a multi-part message in MIME format.
1527--------------050603050603060608020908
1528Content-Type: text/plain; charset=ISO-8859-1
1529Content-Transfer-Encoding: 7bit
1530
1531Plain body.
1532
1533--------------050603050603060608020908
1534Content-Type: message/rfc822
1535Content-Transfer-Encoding: 7bit
1536Content-Disposition: attachment
1537
1538Message-ID: <4D9E647F.4050308@example.net>
1539Date: Thu, 07 Apr 2011 18:27:27 -0700
1540From: User1 <user1@example.com>
1541MIME-Version: 1.0
1542To: Someone1 <someone1@example.net>
1543Content-Type: multipart/mixed; boundary="------------060107040402070208020705"
1544Subject: Attached Message 1 Subject
1545
1546This is a multi-part message in MIME format.
1547--------------060107040402070208020705
1548Content-Type: text/plain; charset=ISO-8859-1
1549Content-Transfer-Encoding: 7bit
1550
1551Attached Message 1 body.
1552
1553--------------060107040402070208020705
1554Content-Type: message/rfc822
1555Content-Transfer-Encoding: 7bit
1556Content-Disposition: attachment
1557
1558From: User2 <user2@example.com>
1559To: Someone2 <someone2@example.net>
1560Subject: Attached Message 2 Subject
1561Date: Thu, 7 Apr 2011 19:09:35 -0500
1562Message-ID: <DAE689E1FD1D493BACD15180145B4151@example.net>
1563MIME-Version: 1.0
1564Content-Type: multipart/mixed;
1565	boundary="----=_NextPart_000_0066_01CBF557.56C6F370"
1566
1567This is a multi-part message in MIME format.
1568
1569------=_NextPart_000_0066_01CBF557.56C6F370
1570Content-Type: text/plain;
1571	charset="us-ascii"
1572Content-Transfer-Encoding: 7bit
1573
1574Attached Message 2 body.
1575
1576------=_NextPart_000_0066_01CBF557.56C6F370
1577Content-Type: message/rfc822
1578Content-Transfer-Encoding: 7bit
1579Content-Disposition: attachment
1580
1581From: User3 <user3@example.com>
1582To: Someone3 <someone3@example.net>
1583Subject: Attached Message 3 Subject
1584Date: Thu, 7 Apr 2011 17:22:04 -0500
1585Message-ID: <BANLkTi=SzfNJo-V7cvrg3nE3uOi9uxXv3g@example.net>
1586MIME-Version: 1.0
1587Content-Type: multipart/alternative;
1588	boundary="----=_NextPart_000_0058_01CBF557.56C48270"
1589
1590This is a multi-part message in MIME format.
1591
1592------=_NextPart_000_0058_01CBF557.56C48270
1593Content-Type: text/plain;
1594	charset="iso-8859-1"
1595Content-Transfer-Encoding: 7bit
1596
1597Attached Message 3 plain body.
1598
1599------=_NextPart_000_0058_01CBF557.56C48270
1600Content-Type: text/html;
1601	charset="iso-8859-1"
1602Content-Transfer-Encoding: quoted-printable
1603
1604
1605Attached Message 3 html body.
1606
1607------=_NextPart_000_0058_01CBF557.56C48270--
1608
1609------=_NextPart_000_0066_01CBF557.56C6F370
1610Content-Type: message/rfc822
1611Content-Transfer-Encoding: 7bit
1612Content-Disposition: attachment
1613
1614From: User4 <user4@example.com>
1615To: Someone4 <someone4@example.net>
1616Subject: Attached Message 4 Subject
1617Date: Thu, 7 Apr 2011 17:24:26 -0500
1618Message-ID: <19CC3BDF28CF49AD988FF43B2DBC5F1D@example>
1619MIME-Version: 1.0
1620Content-Type: multipart/mixed;
1621	boundary="----=_NextPart_000_0060_01CBF557.56C6F370"
1622
1623This is a multi-part message in MIME format.
1624
1625------=_NextPart_000_0060_01CBF557.56C6F370
1626Content-Type: multipart/alternative;
1627	boundary="----=_NextPart_001_0061_01CBF557.56C6F370"
1628
1629------=_NextPart_001_0061_01CBF557.56C6F370
1630Content-Type: text/plain;
1631	charset="us-ascii"
1632Content-Transfer-Encoding: 7bit
1633
1634Attached Message 4 plain body.
1635
1636------=_NextPart_001_0061_01CBF557.56C6F370
1637Content-Type: text/html;
1638	charset="us-ascii"
1639Content-Transfer-Encoding: quoted-printable
1640
1641Attached Message 4 html body.
1642
1643------=_NextPart_001_0061_01CBF557.56C6F370--
1644
1645------=_NextPart_000_0060_01CBF557.56C6F370
1646Content-Type: message/rfc822
1647Content-Transfer-Encoding: 7bit
1648Content-Disposition: attachment
1649
1650From: User5 <user5@example.com>
1651To: Someone5 <someone5@example.net>
1652Subject: Attached Message 5 Subject
1653Date: Thu, 7 Apr 2011 16:24:26 -0500
1654Message-ID: <some_id@example>
1655Content-Type: multipart/alternative;
1656	boundary="----=_NextPart_000_005C_01CBF557.56C6F370"
1657
1658This is a multi-part message in MIME format.
1659
1660------=_NextPart_000_005C_01CBF557.56C6F370
1661Content-Type: text/plain;
1662	charset="iso-8859-1"
1663Content-Transfer-Encoding: 7bit
1664
1665Attached Message 5 plain body.
1666
1667------=_NextPart_000_005C_01CBF557.56C6F370
1668Content-Type: text/html;
1669	charset="iso-8859-1"
1670Content-Transfer-Encoding: quoted-printable
1671
1672Attached Message 5 html body.
1673
1674------=_NextPart_000_005C_01CBF557.56C6F370--
1675
1676------=_NextPart_000_0060_01CBF557.56C6F370
1677Content-Type: text/plain;
1678	name="ATT00055.txt"
1679Content-Transfer-Encoding: quoted-printable
1680Content-Disposition: attachment;
1681	filename="ATT00055.txt"
1682
1683Another plain part.
1684
1685------=_NextPart_000_0060_01CBF557.56C6F370--
1686
1687------=_NextPart_000_0066_01CBF557.56C6F370--
1688
1689--------------060107040402070208020705
1690Content-Type: text/plain; charset="us-ascii"
1691MIME-Version: 1.0
1692Content-Transfer-Encoding: 7bit
1693Content-Disposition: inline
1694
1695Final plain part.
1696
1697--------------060107040402070208020705--
1698
1699--------------050603050603060608020908--
1700""")
1701        MimeDel.process(self._mlist, msg, {})
1702        payload = msg.get_payload()
1703        eq(len(payload), 2)
1704        part1 = msg.get_payload(0)
1705        eq(part1.get_content_type(), 'text/plain')
1706        eq(part1.get_payload(), 'Plain body.\n')
1707        part2 = msg.get_payload(1)
1708        eq(part2.get_content_type(), 'message/rfc822')
1709        payload = part2.get_payload()
1710        eq(len(payload), 1)
1711        part1 = part2.get_payload(0)
1712        eq(part1['subject'], 'Attached Message 1 Subject')
1713        eq(part1.get_content_type(), 'multipart/mixed')
1714        payload = part1.get_payload()
1715        eq(len(payload), 3)
1716        part3 = part1.get_payload(2)
1717        eq(part3.get_content_type(), 'text/plain')
1718        eq(part3.get_payload(), 'Final plain part.\n')
1719        part2 = part1.get_payload(1)
1720        eq(part2.get_content_type(), 'message/rfc822')
1721        part1 = part1.get_payload(0)
1722        eq(part1.get_content_type(), 'text/plain')
1723        eq(part1.get_payload(), 'Attached Message 1 body.\n')
1724        payload = part2.get_payload()
1725        eq(len(payload), 1)
1726        part1 = part2.get_payload(0)
1727        eq(part1['subject'], 'Attached Message 2 Subject')
1728        eq(part1.get_content_type(), 'multipart/mixed')
1729        payload = part1.get_payload()
1730        eq(len(payload), 3)
1731        part3 = part1.get_payload(2)
1732        eq(part3.get_content_type(), 'message/rfc822')
1733        part2 = part1.get_payload(1)
1734        eq(part2.get_content_type(), 'message/rfc822')
1735        part1 = part1.get_payload(0)
1736        eq(part1.get_content_type(), 'text/plain')
1737        eq(part1.get_payload(), 'Attached Message 2 body.\n')
1738        payload = part2.get_payload()
1739        eq(len(payload), 1)
1740        part1 = part2.get_payload(0)
1741        eq(part1['subject'], 'Attached Message 3 Subject')
1742        eq(part1.get_content_type(), 'text/plain')
1743        eq(part1.get_payload(), 'Attached Message 3 plain body.\n')
1744        payload = part3.get_payload()
1745        eq(len(payload), 1)
1746        part1 = part3.get_payload(0)
1747        eq(part1['subject'], 'Attached Message 4 Subject')
1748        eq(part1.get_content_type(), 'multipart/mixed')
1749        payload = part1.get_payload()
1750        eq(len(payload), 3)
1751        part3 = part1.get_payload(2)
1752        eq(part3.get_content_type(), 'text/plain')
1753        eq(part3.get_filename(), 'ATT00055.txt')
1754        eq(part3.get_payload(), 'Another plain part.\n')
1755        part2 = part1.get_payload(1)
1756        eq(part2.get_content_type(), 'message/rfc822')
1757        part1 = part1.get_payload(0)
1758        eq(part1.get_content_type(), 'text/plain')
1759        eq(part1.get_payload(), 'Attached Message 4 plain body.\n')
1760        payload = part2.get_payload()
1761        eq(len(payload), 1)
1762        part1 = part2.get_payload(0)
1763        eq(part1['subject'], 'Attached Message 5 Subject')
1764        eq(part1.get_content_type(), 'text/plain')
1765        eq(part1.get_payload(), 'Attached Message 5 plain body.\n')
1766
1767
1768class TestModerate(TestBase):
1769    pass
1770
1771
1772
1773class TestReplybot(TestBase):
1774    pass
1775
1776
1777
1778class TestSpamDetect(TestBase):
1779    def test_short_circuit(self):
1780        msgdata = {'approved': 1}
1781        msg = email.message_from_string('', Message.Message)
1782        rtn = SpamDetect.process(self._mlist, msg, msgdata)
1783        # Not really a great test, but there's little else to assert
1784        self.assertEqual(rtn, None)
1785
1786    def test_spam_detect(self):
1787        msg1 = email.message_from_string("""\
1788From: aperson@dom.ain
1789
1790A message.
1791""", Message.Message)
1792        msg2 = email.message_from_string("""\
1793To: xlist@dom.ain
1794
1795A message.
1796""", Message.Message)
1797        spammers = mm_cfg.KNOWN_SPAMMERS[:]
1798        try:
1799            mm_cfg.KNOWN_SPAMMERS.append(('from', '.?person'))
1800            self.assertRaises(SpamDetect.SpamDetected,
1801                              SpamDetect.process, self._mlist, msg1, {})
1802            rtn = SpamDetect.process(self._mlist, msg2, {})
1803            self.assertEqual(rtn, None)
1804        finally:
1805            mm_cfg.KNOWN_SPAMMERS = spammers
1806
1807
1808
1809class TestTagger(TestBase):
1810    def setUp(self):
1811        TestBase.setUp(self)
1812        self._mlist.topics = [('bar fight', '.*bar.*', 'catch any bars', 1)]
1813        self._mlist.topics_enabled = 1
1814
1815    def test_short_circuit(self):
1816        self._mlist.topics_enabled = 0
1817        rtn = Tagger.process(self._mlist, None, {})
1818        # Not really a great test, but there's little else to assert
1819        self.assertEqual(rtn, None)
1820
1821    def test_simple(self):
1822        eq = self.assertEqual
1823        mlist = self._mlist
1824        mlist.topics_bodylines_limit = 0
1825        msg = email.message_from_string("""\
1826Subject: foobar
1827Keywords: barbaz
1828
1829""")
1830        msgdata = {}
1831        Tagger.process(mlist, msg, msgdata)
1832        eq(msg['x-topics'], 'bar fight')
1833        eq(msgdata.get('topichits'), ['bar fight'])
1834
1835    def test_all_body_lines_plain_text(self):
1836        eq = self.assertEqual
1837        mlist = self._mlist
1838        mlist.topics_bodylines_limit = -1
1839        msg = email.message_from_string("""\
1840Subject: Was
1841Keywords: Raw
1842
1843Subject: farbaw
1844Keywords: barbaz
1845""")
1846        msgdata = {}
1847        Tagger.process(mlist, msg, msgdata)
1848        eq(msg['x-topics'], 'bar fight')
1849        eq(msgdata.get('topichits'), ['bar fight'])
1850
1851    def test_no_body_lines(self):
1852        eq = self.assertEqual
1853        mlist = self._mlist
1854        mlist.topics_bodylines_limit = 0
1855        msg = email.message_from_string("""\
1856Subject: Was
1857Keywords: Raw
1858
1859Subject: farbaw
1860Keywords: barbaz
1861""")
1862        msgdata = {}
1863        Tagger.process(mlist, msg, msgdata)
1864        eq(msg['x-topics'], None)
1865        eq(msgdata.get('topichits'), None)
1866
1867    def test_body_lines_in_multipart(self):
1868        eq = self.assertEqual
1869        mlist = self._mlist
1870        mlist.topics_bodylines_limit = -1
1871        msg = email.message_from_string("""\
1872Subject: Was
1873Keywords: Raw
1874Content-Type: multipart/alternative; boundary="BOUNDARY"
1875
1876--BOUNDARY
1877From: sabo
1878To: obas
1879
1880Subject: farbaw
1881Keywords: barbaz
1882
1883--BOUNDARY--
1884""")
1885        msgdata = {}
1886        Tagger.process(mlist, msg, msgdata)
1887        eq(msg['x-topics'], 'bar fight')
1888        eq(msgdata.get('topichits'), ['bar fight'])
1889
1890    def test_body_lines_no_part(self):
1891        eq = self.assertEqual
1892        mlist = self._mlist
1893        mlist.topics_bodylines_limit = -1
1894        msg = email.message_from_string("""\
1895Subject: Was
1896Keywords: Raw
1897Content-Type: multipart/alternative; boundary=BOUNDARY
1898
1899--BOUNDARY
1900From: sabo
1901To: obas
1902Content-Type: message/rfc822
1903
1904Subject: farbaw
1905Keywords: barbaz
1906
1907--BOUNDARY
1908From: sabo
1909To: obas
1910Content-Type: message/rfc822
1911
1912Subject: farbaw
1913Keywords: barbaz
1914
1915--BOUNDARY--
1916""")
1917        msgdata = {}
1918        Tagger.process(mlist, msg, msgdata)
1919        eq(msg['x-topics'], None)
1920        eq(msgdata.get('topichits'), None)
1921
1922
1923
1924class TestToArchive(TestBase):
1925    def setUp(self):
1926        TestBase.setUp(self)
1927        # We're going to want to inspect this queue directory
1928        self._sb = Switchboard(mm_cfg.ARCHQUEUE_DIR)
1929
1930    def tearDown(self):
1931        for f in os.listdir(mm_cfg.ARCHQUEUE_DIR):
1932            os.unlink(os.path.join(mm_cfg.ARCHQUEUE_DIR, f))
1933        TestBase.tearDown(self)
1934
1935    def test_short_circuit(self):
1936        eq = self.assertEqual
1937        msgdata = {'isdigest': 1}
1938        ToArchive.process(self._mlist, None, msgdata)
1939        eq(len(self._sb.files()), 0)
1940        # Try the other half of the or...
1941        self._mlist.archive = 0
1942        ToArchive.process(self._mlist, None, msgdata)
1943        eq(len(self._sb.files()), 0)
1944        # Now try the various message header shortcuts
1945        msg = email.message_from_string("""\
1946X-No-Archive: YES
1947
1948""")
1949        self._mlist.archive = 1
1950        ToArchive.process(self._mlist, msg, {})
1951        eq(len(self._sb.files()), 0)
1952        # And for backwards compatibility
1953        msg = email.message_from_string("""\
1954X-Archive: NO
1955
1956""")
1957        ToArchive.process(self._mlist, msg, {})
1958        eq(len(self._sb.files()), 0)
1959
1960    def test_normal_archiving(self):
1961        eq = self.assertEqual
1962        msg = email.message_from_string("""\
1963Subject: About Mailman
1964
1965It rocks!
1966""")
1967        ToArchive.process(self._mlist, msg, {})
1968        files = self._sb.files()
1969        eq(len(files), 1)
1970        msg2, data = self._sb.dequeue(files[0])
1971        eq(len(data), 3)
1972        eq(data['_parsemsg'], False)
1973        eq(data['version'], 3)
1974        # Clock skew makes this unreliable
1975        #self.failUnless(data['received_time'] <= time.time())
1976        eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
1977
1978
1979
1980class TestToDigest(TestBase):
1981    def _makemsg(self, i=0):
1982        msg = email.message_from_string("""From: aperson@dom.ain
1983To: _xtest@dom.ain
1984Subject: message number %(i)d
1985
1986Here is message %(i)d
1987""" % {'i' : i})
1988        return msg
1989
1990    def setUp(self):
1991        TestBase.setUp(self)
1992        self._path = os.path.join(self._mlist.fullpath(), 'digest.mbox')
1993        fp = open(self._path, 'w')
1994        g = Generator(fp)
1995        for i in range(5):
1996            g.flatten(self._makemsg(i), unixfrom=1)
1997        fp.close()
1998        self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR)
1999
2000    def tearDown(self):
2001        try:
2002            os.unlink(self._path)
2003        except OSError, e:
2004            if e.errno <> errno.ENOENT: raise
2005        for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR):
2006            os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f))
2007        TestBase.tearDown(self)
2008
2009    def test_short_circuit(self):
2010        eq = self.assertEqual
2011        mlist = self._mlist
2012        mlist.digestable = 0
2013        eq(ToDigest.process(mlist, None, {}), None)
2014        mlist.digestable = 1
2015        eq(ToDigest.process(mlist, None, {'isdigest': 1}), None)
2016        eq(self._sb.files(), [])
2017
2018    def test_undersized(self):
2019        msg = self._makemsg(99)
2020        size = os.path.getsize(self._path) + len(str(msg))
2021        self._mlist.digest_size_threshhold = (size + 1) * 1024
2022        ToDigest.process(self._mlist, msg, {})
2023        self.assertEqual(self._sb.files(), [])
2024
2025    def test_send_a_digest(self):
2026        eq = self.assertEqual
2027        mlist = self._mlist
2028        msg = self._makemsg(99)
2029        size = os.path.getsize(self._path) + len(str(msg))
2030        # Set digest_size_threshhold to a very small value to force a digest.
2031        # Setting to zero no longer works.
2032        mlist.digest_size_threshhold = 0.001
2033        ToDigest.process(mlist, msg, {})
2034        files = self._sb.files()
2035        # There should be two files in the queue, one for the MIME digest and
2036        # one for the RFC 1153 digest.
2037        eq(len(files), 2)
2038        # Now figure out which of the two files is the MIME digest and which
2039        # is the RFC 1153 digest.
2040        for filebase in files:
2041            qmsg, qdata = self._sb.dequeue(filebase)
2042            if qmsg.get_content_maintype() == 'multipart':
2043                mimemsg = qmsg
2044                mimedata = qdata
2045            else:
2046                rfc1153msg = qmsg
2047                rfc1153data = qdata
2048        eq(rfc1153msg.get_content_type(), 'text/plain')
2049        eq(mimemsg.get_content_type(), 'multipart/mixed')
2050        eq(mimemsg['from'], mlist.GetRequestEmail())
2051        eq(mimemsg['subject'],
2052           '%(realname)s Digest, Vol %(volume)d, Issue %(issue)d' % {
2053            'realname': mlist.real_name,
2054            'volume'  : mlist.volume,
2055            'issue'   : mlist.next_digest_number - 1,
2056            })
2057        eq(mimemsg['to'], mlist.GetListEmail())
2058        # BAW: this test is incomplete...
2059
2060
2061
2062class TestToOutgoing(TestBase):
2063    def setUp(self):
2064        TestBase.setUp(self)
2065        # We're going to want to inspect this queue directory
2066        self._sb = Switchboard(mm_cfg.OUTQUEUE_DIR)
2067
2068    def tearDown(self):
2069        for f in os.listdir(mm_cfg.OUTQUEUE_DIR):
2070            os.unlink(os.path.join(mm_cfg.OUTQUEUE_DIR, f))
2071        TestBase.tearDown(self)
2072
2073    def test_outgoing(self):
2074        eq = self.assertEqual
2075        msg = email.message_from_string("""\
2076Subject: About Mailman
2077
2078It rocks!
2079""")
2080        msgdata = {'foo': 1, 'bar': 2}
2081        ToOutgoing.process(self._mlist, msg, msgdata)
2082        files = self._sb.files()
2083        eq(len(files), 1)
2084        msg2, data = self._sb.dequeue(files[0])
2085        eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
2086        self.failUnless(len(data) >= 6 and len(data) <= 7)
2087        eq(data['foo'], 1)
2088        eq(data['bar'], 2)
2089        eq(data['version'], 3)
2090        eq(data['listname'], '_xtest')
2091        eq(data['_parsemsg'], False)
2092        # Can't test verp. presence/value depend on mm_cfg.py
2093        #eq(data['verp'], 1)
2094        # Clock skew makes this unreliable
2095        #self.failUnless(data['received_time'] <= time.time())
2096
2097
2098
2099class TestToUsenet(TestBase):
2100    def setUp(self):
2101        TestBase.setUp(self)
2102        # We're going to want to inspect this queue directory
2103        self._sb = Switchboard(mm_cfg.NEWSQUEUE_DIR)
2104
2105    def tearDown(self):
2106        for f in os.listdir(mm_cfg.NEWSQUEUE_DIR):
2107            os.unlink(os.path.join(mm_cfg.NEWSQUEUE_DIR, f))
2108        TestBase.tearDown(self)
2109
2110    def test_short_circuit(self):
2111        eq = self.assertEqual
2112        mlist = self._mlist
2113        mlist.gateway_to_news = 0
2114        ToUsenet.process(mlist, None, {})
2115        eq(len(self._sb.files()), 0)
2116        mlist.gateway_to_news = 1
2117        ToUsenet.process(mlist, None, {'isdigest': 1})
2118        eq(len(self._sb.files()), 0)
2119        ToUsenet.process(mlist, None, {'fromusenet': 1})
2120        eq(len(self._sb.files()), 0)
2121
2122    def test_to_usenet(self):
2123        # BAW: Should we, can we, test the error conditions that only log to a
2124        # file instead of raising an exception?
2125        eq = self.assertEqual
2126        mlist = self._mlist
2127        mlist.gateway_to_news = 1
2128        mlist.linked_newsgroup = 'foo'
2129        mlist.nntp_host = 'bar'
2130        msg = email.message_from_string("""\
2131Subject: About Mailman
2132
2133Mailman rocks!
2134""")
2135        ToUsenet.process(mlist, msg, {})
2136        files = self._sb.files()
2137        eq(len(files), 1)
2138        msg2, data = self._sb.dequeue(files[0])
2139        eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
2140        eq(data['version'], 3)
2141        eq(data['listname'], '_xtest')
2142        # Clock skew makes this unreliable
2143        #self.failUnless(data['received_time'] <= time.time())
2144
2145
2146
2147def suite():
2148    suite = unittest.TestSuite()
2149    suite.addTest(unittest.makeSuite(TestAcknowledge))
2150    suite.addTest(unittest.makeSuite(TestAfterDelivery))
2151    suite.addTest(unittest.makeSuite(TestApprove))
2152    suite.addTest(unittest.makeSuite(TestCalcRecips))
2153    suite.addTest(unittest.makeSuite(TestCleanse))
2154    suite.addTest(unittest.makeSuite(TestCookHeaders))
2155    suite.addTest(unittest.makeSuite(TestDecorate))
2156    suite.addTest(unittest.makeSuite(TestFileRecips))
2157    suite.addTest(unittest.makeSuite(TestHold))
2158    suite.addTest(unittest.makeSuite(TestMimeDel))
2159    suite.addTest(unittest.makeSuite(TestModerate))
2160    suite.addTest(unittest.makeSuite(TestReplybot))
2161    suite.addTest(unittest.makeSuite(TestSpamDetect))
2162    suite.addTest(unittest.makeSuite(TestTagger))
2163    suite.addTest(unittest.makeSuite(TestToArchive))
2164    suite.addTest(unittest.makeSuite(TestToDigest))
2165    suite.addTest(unittest.makeSuite(TestToOutgoing))
2166    suite.addTest(unittest.makeSuite(TestToUsenet))
2167    return suite
2168
2169
2170
2171if __name__ == '__main__':
2172    unittest.main(defaultTest='suite')
2173