1# Copyright (C) 2010-2020 by the Free Software Foundation, Inc.
2#
3# This file is part of GNU Mailman.
4#
5# GNU Mailman is free software: you can redistribute it and/or modify it under
6# the terms of the GNU General Public License as published by the Free
7# Software Foundation, either version 3 of the License, or (at your option)
8# any later version.
9#
10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13# more details.
14#
15# You should have received a copy of the GNU General Public License along with
16# GNU Mailman.  If not, see <https://www.gnu.org/licenses/>.
17
18"""Tests for config.pck imports."""
19
20import os
21import unittest
22
23from datetime import timedelta, datetime
24from enum import Enum
25from importlib_resources import open_binary
26from mailman.app.lifecycle import create_list
27from mailman.config import config
28from mailman.handlers.decorate import decorate
29from mailman.interfaces.action import Action, FilterAction
30from mailman.interfaces.address import InvalidEmailAddressError
31from mailman.interfaces.archiver import ArchivePolicy
32from mailman.interfaces.autorespond import ResponseAction
33from mailman.interfaces.bans import IBanManager
34from mailman.interfaces.bounce import UnrecognizedBounceDisposition
35from mailman.interfaces.domain import IDomainManager
36from mailman.interfaces.languages import ILanguageManager
37from mailman.interfaces.mailinglist import (
38    DMARCMitigateAction, IAcceptableAliasSet, SubscriptionPolicy)
39from mailman.interfaces.member import DeliveryMode, DeliveryStatus
40from mailman.interfaces.nntp import NewsgroupModeration
41from mailman.interfaces.template import ITemplateLoader, ITemplateManager
42from mailman.interfaces.usermanager import IUserManager
43from mailman.model.roster import RosterVisibility
44from mailman.testing.helpers import LogFileMark
45from mailman.testing.layers import ConfigLayer
46from mailman.utilities.filesystem import makedirs
47from mailman.utilities.importer import (
48    Import21Error, check_language_code, import_config_pck)
49from pickle import load
50from unittest import mock
51from urllib.error import URLError
52from zope.component import getUtility
53
54
55NL = '\n'
56
57
58class DummyEnum(Enum):
59    # For testing purposes
60    val = 42
61
62
63def list_to_string(data):
64    return NL.join(data).encode('utf-8')
65
66
67class TestBasicImport(unittest.TestCase):
68    layer = ConfigLayer
69    maxDiff = None
70
71    def setUp(self):
72        self._mlist = create_list('blank@example.com')
73        self._pckdict = load(open_binary('mailman.testing', 'config.pck'))
74
75    def _import(self):
76        import_config_pck(self._mlist, self._pckdict)
77
78    def test_display_name(self):
79        # The mlist.display_name gets set from the old list's real_name.
80        self.assertEqual(self._mlist.display_name, 'Blank')
81        self._import()
82        self.assertEqual(self._mlist.display_name, 'Test')
83
84    def test_mail_host_invariant(self):
85        # The mlist.mail_host must not be updated when importing (it will
86        # change the list_id property, which is supposed to be read-only).
87        self.assertEqual(self._mlist.mail_host, 'example.com')
88        self._import()
89        self.assertEqual(self._mlist.mail_host, 'example.com')
90
91    def test_rfc2369_headers(self):
92        self._mlist.allow_list_posts = False
93        self._mlist.include_rfc2369_headers = False
94        self._import()
95        self.assertTrue(self._mlist.allow_list_posts)
96        self.assertTrue(self._mlist.include_rfc2369_headers)
97
98    def test_no_overwrite_rosters(self):
99        # The mlist.members and mlist.digest_members rosters must not be
100        # overwritten.
101        for rname in ('members', 'digest_members'):
102            roster = getattr(self._mlist, rname)
103            self.assertFalse(isinstance(roster, dict))
104            # Suppress warning messages in test output.
105            with mock.patch('sys.stderr'):
106                self._import()
107            self.assertFalse(
108                isinstance(roster, dict),
109                'The %s roster has been overwritten by the import' % rname)
110
111    def test_last_post_time(self):
112        # last_post_time -> last_post_at
113        self._pckdict['last_post_time'] = 1270420800.274485
114        self.assertEqual(self._mlist.last_post_at, None)
115        self._import()
116        # convert 1270420800.2744851 to datetime
117        expected = datetime(2010, 4, 4, 22, 40, 0, 274485)
118        self.assertEqual(self._mlist.last_post_at, expected)
119
120    def test_autoresponse_grace_period(self):
121        # autoresponse_graceperiod -> autoresponse_grace_period
122        # must be a timedelta, not an int
123        self._mlist.autoresponse_grace_period = timedelta(days=42)
124        self._import()
125        self.assertTrue(
126            isinstance(self._mlist.autoresponse_grace_period, timedelta))
127        self.assertEqual(self._mlist.autoresponse_grace_period,
128                         timedelta(days=90))
129
130    def test_autoresponse_admin_to_owner(self):
131        # admin -> owner
132        self._mlist.autorespond_owner = DummyEnum.val
133        self._mlist.autoresponse_owner_text = 'DUMMY'
134        self._import()
135        self.assertEqual(self._mlist.autorespond_owner, ResponseAction.none)
136        self.assertEqual(self._mlist.autoresponse_owner_text, '')
137
138    def test_autoresponse_owner_yes(self):
139        # Yes -> ResponseAction.respond_and_continue
140        self._mlist.autorespond_owner = DummyEnum.val
141        self._mlist.autoresponse_owner_text = 'DUMMY'
142        self._pckdict['autorespond_admin'] = 1
143        self._pckdict['autoresponse_admin_text'] = 'Autoresponse'
144        self._import()
145        self.assertEqual(self._mlist.autorespond_owner,
146                         ResponseAction.respond_and_continue)
147        self.assertEqual(self._mlist.autoresponse_owner_text, 'Autoresponse')
148
149    def test_autoresponse_post_yes(self):
150        # Yes -> ResponseAction.respond_and_continue
151        self._mlist.autorespond_postings = DummyEnum.val
152        self._mlist.autoresponse_postings_text = 'DUMMY'
153        self._pckdict['autorespond_postings'] = 1
154        self._pckdict['autoresponse_postings_text'] = 'Autoresponse'
155        self._import()
156        self.assertEqual(self._mlist.autorespond_postings,
157                         ResponseAction.respond_and_continue)
158        self.assertEqual(self._mlist.autoresponse_postings_text,
159                         'Autoresponse')
160
161    def test_autoresponse_post_no(self):
162        # No -> ResponseAction.none
163        self._mlist.autorespond_postings = DummyEnum.val
164        self._mlist.autoresponse_postings_text = 'DUMMY'
165        self._pckdict['autorespond_postings'] = 0
166        self._import()
167        self.assertEqual(self._mlist.autorespond_postings,
168                         ResponseAction.none)
169        self.assertEqual(self._mlist.autoresponse_postings_text, '')
170
171    def test_autoresponse_request_continue(self):
172        # Yes, w/forward -> ResponseAction.respond_and_continue
173        self._mlist.autorespond_requests = DummyEnum.val
174        self._mlist.autoresponse_request_text = 'DUMMY'
175        self._pckdict['autorespond_requests'] = 2
176        self._pckdict['autoresponse_request_text'] = 'Autoresponse'
177        self._import()
178        self.assertEqual(self._mlist.autorespond_requests,
179                         ResponseAction.respond_and_continue)
180        self.assertEqual(self._mlist.autoresponse_request_text,
181                         'Autoresponse')
182
183    def test_autoresponse_request_discard(self):
184        # Yes, w/discard -> ResponseAction.respond_and_discard
185        self._mlist.autorespond_requests = DummyEnum.val
186        self._mlist.autoresponse_request_text = 'DUMMY'
187        self._pckdict['autorespond_requests'] = 1
188        self._pckdict['autoresponse_request_text'] = 'Autoresponse'
189        self._import()
190        self.assertEqual(self._mlist.autorespond_requests,
191                         ResponseAction.respond_and_discard)
192        self.assertEqual(self._mlist.autoresponse_request_text,
193                         'Autoresponse')
194
195    def test_autoresponse_request_no(self):
196        # No -> ResponseAction.none
197        self._mlist.autorespond_requests = DummyEnum.val
198        self._mlist.autoresponse_request_text = 'DUMMY'
199        self._pckdict['autorespond_requests'] = 0
200        self._import()
201        self.assertEqual(self._mlist.autorespond_requests,
202                         ResponseAction.none)
203        self.assertEqual(self._mlist.autoresponse_request_text, '')
204
205    def test_administrativia(self):
206        self._mlist.administrivia = None
207        self._import()
208        self.assertTrue(self._mlist.administrivia)
209
210    def test_filter_pass_renames(self):
211        # mime_types -> types
212        # filename_extensions -> extensions
213        self._mlist.filter_types = ['dummy']
214        self._mlist.pass_types = ['dummy']
215        self._mlist.filter_extensions = ['dummy']
216        self._mlist.pass_extensions = ['dummy']
217        self._import()
218        self.assertEqual(list(self._mlist.filter_types), [])
219        self.assertEqual(list(self._mlist.filter_extensions),
220                         ['exe', 'bat', 'cmd', 'com', 'pif',
221                          'scr', 'vbs', 'cpl'])
222        self.assertEqual(
223            list(self._mlist.pass_types),
224            ['multipart/mixed', 'multipart/alternative', 'text/plain'])
225        self.assertEqual(list(self._mlist.pass_extensions), [])
226
227    def test_process_bounces(self):
228        # bounce_processing -> process_bounces
229        self._mlist.process_bounces = None
230        self._import()
231        self.assertTrue(self._mlist.process_bounces)
232
233    def test_forward_unrecognized_bounces_to(self):
234        # bounce_unrecognized_goes_to_list_owner
235        #   -> forward_unrecognized_bounces_to
236        self._mlist.forward_unrecognized_bounces_to = DummyEnum.val
237        self._import()
238        self.assertEqual(self._mlist.forward_unrecognized_bounces_to,
239                         UnrecognizedBounceDisposition.administrators)
240
241    def test_moderator_password(self):
242        # mod_password -> moderator_password
243        self._mlist.moderator_password = b'TESTDATA'
244        self._import()
245        self.assertEqual(self._mlist.moderator_password, None)
246
247    def test_moderator_password_str(self):
248        # moderator_password must not be unicode
249        self._pckdict['mod_password'] = b'TESTVALUE'
250        self._import()
251        self.assertNotIsInstance(self._mlist.moderator_password, str)
252        self.assertEqual(self._mlist.moderator_password, b'TESTVALUE')
253
254    def test_newsgroup_moderation(self):
255        # news_moderation -> newsgroup_moderation
256        # news_prefix_subject_too -> nntp_prefix_subject_too
257        self._mlist.newsgroup_moderation = DummyEnum.val
258        self._mlist.nntp_prefix_subject_too = None
259        self._import()
260        self.assertEqual(self._mlist.newsgroup_moderation,
261                         NewsgroupModeration.none)
262        self.assertTrue(self._mlist.nntp_prefix_subject_too)
263
264    def test_msg_to_message(self):
265        # send_welcome_msg -> send_welcome_message
266        # send_goodbye_msg -> send_goodbye_message
267        self._mlist.send_welcome_message = None
268        self._mlist.send_goodbye_message = None
269        self._import()
270        self.assertTrue(self._mlist.send_welcome_message)
271        self.assertTrue(self._mlist.send_goodbye_message)
272
273    def test_dmarc_zero_from_is_list(self):
274        self._mlist.dmarc_mitigate_action = DummyEnum.val
275        self._mlist.dmarc_mitigate_unconditionally = True
276        self._pckdict['from_is_list'] = 0
277        self._pckdict['dmarc_moderation_action'] = 1
278        self._import()
279        self.assertFalse(self._mlist.dmarc_mitigate_unconditionally)
280        self.assertEqual(self._mlist.dmarc_mitigate_action,
281                         DMARCMitigateAction.munge_from)
282
283    def test_dmarc_zero_dmarc_moderation_action(self):
284        self._mlist.dmarc_mitigate_action = DummyEnum.val
285        self._mlist.dmarc_mitigate_unconditionally = False
286        self._pckdict['from_is_list'] = 1
287        self._pckdict['dmarc_moderation_action'] = 0
288        self._import()
289        self.assertTrue(self._mlist.dmarc_mitigate_unconditionally)
290        self.assertEqual(self._mlist.dmarc_mitigate_action,
291                         DMARCMitigateAction.munge_from)
292
293    def test_dmarc_nonzero_actions_fil(self):
294        self._mlist.dmarc_mitigate_action = DummyEnum.val
295        self._mlist.dmarc_mitigate_unconditionally = False
296        self._pckdict['from_is_list'] = 2
297        self._pckdict['dmarc_moderation_action'] = 1
298        self._import()
299        self.assertTrue(self._mlist.dmarc_mitigate_unconditionally)
300        self.assertEqual(self._mlist.dmarc_mitigate_action,
301                         DMARCMitigateAction.wrap_message)
302
303    def test_dmarc_nonzero_actions_dma(self):
304        self._mlist.dmarc_mitigate_action = DummyEnum.val
305        self._mlist.dmarc_mitigate_unconditionally = True
306        self._pckdict['from_is_list'] = 1
307        self._pckdict['dmarc_moderation_action'] = 2
308        self._import()
309        self.assertFalse(self._mlist.dmarc_mitigate_unconditionally)
310        self.assertEqual(self._mlist.dmarc_mitigate_action,
311                         DMARCMitigateAction.wrap_message)
312
313    def test_dmarc_messages(self):
314        self._pckdict['dmarc_moderation_notice'] = b'This is a notice.\n'
315        self._pckdict['dmarc_wrapped_message_text'] = b'This is text.\n'
316        self._import()
317        self.assertEqual('This is a notice.\n',
318                         self._mlist.dmarc_moderation_notice)
319        self.assertEqual('This is text.\n',
320                         self._mlist.dmarc_wrapped_message_text)
321
322    def test_ban_list(self):
323        banned = [
324            ('anne@example.net', 'anne@example.net'),
325            ('^.*@example.edu', 'bob@example.edu'),
326            ('non-ascii-\xe8@example.com', 'non-ascii-\ufffd@example.com'),
327            ]
328        self._pckdict['ban_list'] = [b[0].encode('iso-8859-1') for b in banned]
329        self._import()
330        for _pattern, addr in banned:
331            self.assertTrue(IBanManager(self._mlist).is_banned(addr))
332
333    def test_acceptable_aliases(self):
334        # This used to be a plain-text field (values are newline-separated)
335        # but values were interpreted as regexps even without '^' so we need
336        # to add the '^'.
337        aliases = ['alias1@example.com',
338                   'alias2@exemple.com',
339                   'non-ascii-\xe8@example.com',
340                   ]
341        self._pckdict['acceptable_aliases'] = list_to_string(aliases)
342        self._import()
343        alias_set = IAcceptableAliasSet(self._mlist)
344        self.assertEqual(sorted(alias_set.aliases),
345                         [('^' + alias) for alias in aliases])
346
347    def test_acceptable_aliases_invalid(self):
348        # Values without an '@' sign used to be matched against the local
349        # part, now we need to add the '^' sign to indicate it's a regexp.
350        aliases = ['invalid-value']
351        self._pckdict['acceptable_aliases'] = list_to_string(aliases)
352        self._import()
353        alias_set = IAcceptableAliasSet(self._mlist)
354        self.assertEqual(sorted(alias_set.aliases),
355                         [('^' + alias) for alias in aliases])
356
357    def test_acceptable_aliases_as_list(self):
358        # In some versions of the pickle, this can be a list, not a string
359        # (seen in the wild).  We still need to add the '^'.
360        aliases = [b'alias1@example.com', b'alias2@exemple.com']
361        self._pckdict['acceptable_aliases'] = aliases
362        self._import()
363        alias_set = IAcceptableAliasSet(self._mlist)
364        self.assertEqual(sorted(alias_set.aliases),
365                         sorted(('^' + a.decode('utf-8')) for a in aliases))
366
367    def test_dont_add_caret_if_present(self):
368        # The 2.1 alias could have had a leading '^' even though not required.
369        aliases = ['^alias1@example.com',
370                   '^alias2@.*',
371                   ]
372        self._pckdict['acceptable_aliases'] = list_to_string(aliases)
373        self._import()
374        alias_set = IAcceptableAliasSet(self._mlist)
375        self.assertEqual(sorted(alias_set.aliases), aliases)
376
377    def test_info_non_ascii(self):
378        # info can contain non-ascii characters.
379        info = 'O idioma aceito \xe9 somente Portugu\xeas do Brasil'
380        self._pckdict['info'] = info.encode('utf-8')
381        self._import()
382        self.assertEqual(self._mlist.info, info,
383                         'Encoding to UTF-8 is not handled')
384        # Test fallback to ascii with replace.
385        self._pckdict['info'] = info.encode('iso-8859-1')
386        # Suppress warning messages in test output.
387        with mock.patch('sys.stderr'):
388            self._import()
389        self.assertEqual(
390            self._mlist.info,
391            self._pckdict['info'].decode('ascii', 'replace'),
392            "We don't fall back to replacing non-ascii chars")
393
394    def test_preferred_language(self):
395        self._pckdict['preferred_language'] = b'ja'
396        english = getUtility(ILanguageManager).get('en')
397        japanese = getUtility(ILanguageManager).get('ja')
398        self.assertEqual(self._mlist.preferred_language, english)
399        self._import()
400        self.assertEqual(self._mlist.preferred_language, japanese)
401
402    def test_preferred_language_unknown_previous(self):
403        # When the previous language is unknown, it should not fail.
404        self._mlist._preferred_language = 'xx'
405        self._import()
406        english = getUtility(ILanguageManager).get('en')
407        self.assertEqual(self._mlist.preferred_language, english)
408
409    def test_new_language(self):
410        self._pckdict['preferred_language'] = b'xx_XX'
411        try:
412            self._import()
413        except Import21Error as error:
414            # Check the message.
415            self.assertIn('[language.xx_XX]', str(error))
416        else:
417            self.fail('Import21Error was not raised')
418
419    def test_encode_ascii_prefixes(self):
420        self._pckdict['encode_ascii_prefixes'] = 2
421        self.assertEqual(self._mlist.encode_ascii_prefixes, False)
422        self._import()
423        self.assertEqual(self._mlist.encode_ascii_prefixes, True)
424
425    def test_subscription_policy_open(self):
426        self._mlist.subscription_policy = SubscriptionPolicy.confirm
427        self._pckdict['subscribe_policy'] = 0
428        self._import()
429        self.assertEqual(self._mlist.subscription_policy,
430                         SubscriptionPolicy.open)
431
432    def test_subscription_policy_confirm(self):
433        self._mlist.subscription_policy = SubscriptionPolicy.open
434        self._pckdict['subscribe_policy'] = 1
435        self._import()
436        self.assertEqual(self._mlist.subscription_policy,
437                         SubscriptionPolicy.confirm)
438
439    def test_subscription_policy_moderate(self):
440        self._mlist.subscription_policy = SubscriptionPolicy.open
441        self._pckdict['subscribe_policy'] = 2
442        self._import()
443        self.assertEqual(self._mlist.subscription_policy,
444                         SubscriptionPolicy.moderate)
445
446    def test_subscription_policy_confirm_then_moderate(self):
447        self._mlist.subscription_policy = SubscriptionPolicy.open
448        self._pckdict['subscribe_policy'] = 3
449        self._import()
450        self.assertEqual(self._mlist.subscription_policy,
451                         SubscriptionPolicy.confirm_then_moderate)
452
453    def test_header_matches(self):
454        # This test containes real cases of header_filter_rules.
455        self._pckdict['header_filter_rules'] = [
456            ('X\\-Spam\\-Status\\: Yes.*', 3, False),
457            ('^X-Spam-Status: Yes\r\n\r\n', 2, False),
458            ('^X-Spam-Level: \\*\\*\\*.*$', 3, False),
459            ('^X-Spam-Level:.\\*\\*\r\n^X-Spam:.Yes', 3, False),
460            ('Subject: \\[SPAM\\].*', 3, False),
461            ('^Subject: .*loan.*', 3, False),
462            ('Original-Received: from *linkedin.com*\r\n', 3, False),
463            ('X-Git-Module: rhq.*git', 6, False),
464            ('Approved: verysecretpassword', 6, False),
465            ('^Subject: dev-\r\n^Subject: staging-', 3, False),
466            ('from: .*info@aolanchem.com\r\nfrom: .*@jw-express.com',
467             2, False),
468            ('^Subject:.*\\Wwas:\\W', 3, False),
469            ('^Received: from smtp-.*\\.fedoraproject\\.org\r\n'
470             '^Received: from mx.*\\.redhat.com\r\n'
471             '^Resent-date:\r\n'
472             '^Resent-from:\r\n'
473             '^Resent-Message-ID:\r\n'
474             '^Resent-to:\r\n'
475             '^Subject: [^mtv]\r\n',
476             7, False),
477            ('^Received: from fedorahosted\\.org.*by fedorahosted\\.org\r\n'
478             '^Received: from hosted.*\\.fedoraproject.org.*by '
479             'hosted.*\\.fedoraproject\\.org\r\n'
480             '^Received: from hosted.*\\.fedoraproject.org.*by '
481                'fedoraproject\\.org\r\n'
482             '^Received: from hosted.*\\.fedoraproject.org.*by '
483                'fedorahosted\\.org',
484             6, False),
485            ]
486        error_log = LogFileMark('mailman.error')
487        self._import()
488        self.assertListEqual(
489            [(hm.header, hm.pattern, hm.chain)
490             for hm in self._mlist.header_matches], [
491                ('x-spam-status', 'Yes.*', 'discard'),
492                ('x-spam-status', 'Yes', 'reject'),
493                ('x-spam-level', '\\*\\*\\*.*$', 'discard'),
494                ('x-spam-level', '\\*\\*', 'discard'),
495                ('x-spam', 'Yes', 'discard'),
496                ('subject', '\\[SPAM\\].*', 'discard'),
497                ('subject', '.*loan.*', 'discard'),
498                ('original-received', 'from *linkedin.com*', 'discard'),
499                ('x-git-module', 'rhq.*git', 'accept'),
500                ('approved', 'verysecretpassword', 'accept'),
501                ('subject', 'dev-', 'discard'),
502                ('subject', 'staging-', 'discard'),
503                ('from', '.*info@aolanchem.com', 'reject'),
504                ('from', '.*@jw-express.com', 'reject'),
505                ('subject', '\\Wwas:\\W', 'discard'),
506                ('received', 'from smtp-.*\\.fedoraproject\\.org', 'hold'),
507                ('received', 'from mx.*\\.redhat.com', 'hold'),
508                ('resent-date', '.*', 'hold'),
509                ('resent-from', '.*', 'hold'),
510                ('resent-message-id', '.*', 'hold'),
511                ('resent-to', '.*', 'hold'),
512                ('subject', '[^mtv]', 'hold'),
513                ('received', 'from fedorahosted\\.org.*by fedorahosted\\.org',
514                 'accept'),
515                ('received',
516                 'from hosted.*\\.fedoraproject.org.*by '
517                    'hosted.*\\.fedoraproject\\.org', 'accept'),
518                ('received',
519                 'from hosted.*\\.fedoraproject.org.*by '
520                    'fedoraproject\\.org', 'accept'),
521                ('received',
522                 'from hosted.*\\.fedoraproject.org.*by '
523                    'fedorahosted\\.org', 'accept'),
524                ])
525        loglines = error_log.read().strip()
526        self.assertEqual(len(loglines), 0)
527
528    def test_header_matches_header_only(self):
529        # Check that an empty pattern is skipped.
530        self._pckdict['header_filter_rules'] = [
531            ('SomeHeaderName', 3, False),
532            ]
533        error_log = LogFileMark('mailman.error')
534        self._import()
535        self.assertListEqual(self._mlist.header_matches, [])
536        self.assertIn('Unsupported header_filter_rules pattern',
537                      error_log.readline())
538
539    def test_header_matches_anything(self):
540        # Check that a wild card header pattern is skipped.
541        self._pckdict['header_filter_rules'] = [
542            ('.*', 7, False),
543            ]
544        error_log = LogFileMark('mailman.error')
545        self._import()
546        self.assertListEqual(self._mlist.header_matches, [])
547        self.assertIn('Unsupported header_filter_rules pattern',
548                      error_log.readline())
549
550    def test_header_matches_invalid_re(self):
551        # Check that an invalid regular expression pattern is skipped.
552        self._pckdict['header_filter_rules'] = [
553            ('SomeHeaderName: *invalid-re', 3, False),
554            ]
555        error_log = LogFileMark('mailman.error')
556        self._import()
557        self.assertListEqual(self._mlist.header_matches, [])
558        self.assertIn('Skipping header_filter rule because of an invalid '
559                      'regular expression', error_log.readline())
560
561    def test_header_matches_defer(self):
562        # Check that a defer action is properly converted.
563        self._pckdict['header_filter_rules'] = [
564            ('^X-Spam-Status: Yes', 0, False),
565            ]
566        self._import()
567        self.assertListEqual(
568            [(hm.header, hm.pattern, hm.chain)
569             for hm in self._mlist.header_matches],
570            [('x-spam-status', 'Yes', None)]
571            )
572
573    def test_header_matches_unsupported_action(self):
574        # Check that unsupported actions are skipped.
575        for action_num in (1, 4, 5):
576            self._pckdict['header_filter_rules'] = [
577                ('HeaderName: test-re', action_num, False),
578                ]
579            error_log = LogFileMark('mailman.error')
580            self._import()
581            self.assertListEqual(self._mlist.header_matches, [])
582            self.assertIn('Unsupported header_filter_rules action',
583                          error_log.readline())
584            # Avoid a useless warning.
585            for member in self._mlist.members.members:
586                member.unsubscribe()
587            for member in self._mlist.owners.members:
588                member.unsubscribe()
589
590    def test_header_matches_duplicate(self):
591        # Check that duplicate patterns don't cause tracebacks.
592        self._pckdict['header_filter_rules'] = [
593            ('SomeHeaderName: test-pattern', 3, False),
594            ('SomeHeaderName: test-pattern', 2, False),
595            ]
596        error_log = LogFileMark('mailman.error')
597        self._import()
598        self.assertListEqual(
599            [(hm.header, hm.pattern, hm.chain)
600             for hm in self._mlist.header_matches],
601            [('someheadername', 'test-pattern', 'discard')]
602            )
603        self.assertIn('Skipping duplicate header_filter rule',
604                      error_log.readline())
605
606
607class TestArchiveImport(unittest.TestCase):
608    """Test conversion of the archive policies.
609
610    Mailman 2.1 had two variables `archive` and `archive_private`.  Now
611    there's just a single `archive_policy` enum.
612    """
613    layer = ConfigLayer
614
615    def setUp(self):
616        self._mlist = create_list('blank@example.com')
617        self._mlist.archive_policy = DummyEnum.val
618
619    def _do_test(self, pckdict, expected):
620        import_config_pck(self._mlist, pckdict)
621        self.assertEqual(self._mlist.archive_policy, expected)
622
623    def test_public(self):
624        self._do_test(dict(archive=True, archive_private=False),
625                      ArchivePolicy.public)
626
627    def test_private(self):
628        self._do_test(dict(archive=True, archive_private=True),
629                      ArchivePolicy.private)
630
631    def test_no_archive(self):
632        self._do_test(dict(archive=False, archive_private=False),
633                      ArchivePolicy.never)
634
635    def test_bad_state(self):
636        # For some reason, the old list has the invalid archiving state where
637        # `archive` is False and `archive_private` is True.  It doesn't matter
638        # because this still collapses to the same enum value.
639        self._do_test(dict(archive=False, archive_private=True),
640                      ArchivePolicy.never)
641
642    def test_missing_archive_key(self):
643        # For some reason, the old list didn't have an `archive` key.  We
644        # treat this as if no archiving is done.
645        self._do_test(dict(archive_private=False), ArchivePolicy.never)
646
647    def test_missing_archive_key_archive_public(self):
648        # For some reason, the old list didn't have an `archive` key, and it
649        # has weird value for archive_private.  We treat this as if no
650        # archiving is done.
651        self._do_test(dict(archive_private=True), ArchivePolicy.never)
652
653    def test_missing_archive_private_key(self):
654        # For some reason, the old list was missing an `archive_private` key.
655        # For maximum safety, we treat this as private archiving.
656        self._do_test(dict(archive=True), ArchivePolicy.private)
657
658
659class TestFilterActionImport(unittest.TestCase):
660    # The mlist.filter_action enum values have changed.  In Mailman 2.1 the
661    # order was 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'.
662
663    layer = ConfigLayer
664
665    def setUp(self):
666        self._mlist = create_list('blank@example.com')
667        self._mlist.filter_action = DummyEnum.val
668
669    def _do_test(self, original, expected):
670        import_config_pck(self._mlist, dict(filter_action=original))
671        self.assertEqual(self._mlist.filter_action, expected)
672
673    def test_discard(self):
674        self._do_test(0, FilterAction.discard)
675
676    def test_reject(self):
677        self._do_test(1, FilterAction.reject)
678
679    def test_forward(self):
680        self._do_test(2, FilterAction.forward)
681
682    def test_preserve(self):
683        self._do_test(3, FilterAction.preserve)
684
685
686class TestMemberActionImport(unittest.TestCase):
687    # The mlist.default_member_action and mlist.default_nonmember_action enum
688    # values are different in Mailman 2.1; they have been merged into a
689    # single enum in Mailman 3.
690    #
691    # For default_member_action, which used to be called
692    # member_moderation_action, the values were:
693    # 0==Hold, 1=Reject, 2==Discard
694    #
695    # For default_nonmember_action, which used to be called
696    # generic_nonmember_action, the values were:
697    # 0==Accept, 1==Hold, 2==Reject, 3==Discard
698
699    layer = ConfigLayer
700
701    def setUp(self):
702        self._mlist = create_list('blank@example.com')
703        self._mlist.default_member_action = DummyEnum.val
704        self._mlist.default_nonmember_action = DummyEnum.val
705        self._pckdict = dict(
706            member_moderation_action=DummyEnum.val,
707            generic_nonmember_action=DummyEnum.val,
708            )
709
710    def _do_test(self, expected):
711        # Suppress warning messages in the test output.
712        with mock.patch('sys.stderr'):
713            import_config_pck(self._mlist, self._pckdict)
714        for key, value in expected.items():
715            self.assertEqual(getattr(self._mlist, key), value)
716
717    def test_member_defer(self):
718        # If default_member_moderation is not set, the member_moderation_action
719        # value is meaningless.
720        self._pckdict['default_member_moderation'] = 0
721        for mmaval in range(3):
722            self._pckdict['member_moderation_action'] = mmaval
723            self._do_test(dict(default_member_action=Action.defer))
724
725    def test_member_hold(self):
726        self._pckdict['default_member_moderation'] = 1
727        self._pckdict['member_moderation_action'] = 0
728        self._do_test(dict(default_member_action=Action.hold))
729
730    def test_member_reject(self):
731        self._pckdict['default_member_moderation'] = 1
732        self._pckdict['member_moderation_action'] = 1
733        self._do_test(dict(default_member_action=Action.reject))
734
735    def test_member_discard(self):
736        self._pckdict['default_member_moderation'] = 1
737        self._pckdict['member_moderation_action'] = 2
738        self._do_test(dict(default_member_action=Action.discard))
739
740    def test_nonmember_accept(self):
741        self._pckdict['generic_nonmember_action'] = 0
742        self._do_test(dict(default_nonmember_action=Action.defer))
743
744    def test_nonmember_hold(self):
745        self._pckdict['generic_nonmember_action'] = 1
746        self._do_test(dict(default_nonmember_action=Action.hold))
747
748    def test_nonmember_reject(self):
749        self._pckdict['generic_nonmember_action'] = 2
750        self._do_test(dict(default_nonmember_action=Action.reject))
751
752    def test_nonmember_discard(self):
753        self._pckdict['generic_nonmember_action'] = 3
754        self._do_test(dict(default_nonmember_action=Action.discard))
755
756
757class TestConvertToURI(unittest.TestCase):
758    # The following values were plain text, and are now URIs in Mailman 3:
759    # - welcome_message
760    # - goodbye_message
761    # - msg_header
762    # - msg_footer
763    # - digest_header
764    # - digest_footer
765    #
766    # We intentionally don't do welcome_message because it doesn't map well
767    # from MM 2.1
768    #
769    # The templates contain variables that must be replaced:
770    # - %(real_name)s -> %(display_name)s
771    # - %(real_name)s@%(host_name)s -> %(fqdn_listname)s
772    # - %(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s
773    #       -> %(listinfo_uri)s
774
775    layer = ConfigLayer
776    maxDiff = None
777
778    def setUp(self):
779        self._mlist = create_list('blank@example.com')
780        self._conf_mapping = dict(
781            goodbye_msg='list:user:notice:goodbye',
782            msg_header='list:member:regular:header',
783            msg_footer='list:member:regular:footer',
784            digest_header='list:member:digest:header',
785            digest_footer='list:member:digest:footer',
786            )
787        self._pckdict = dict()
788
789    def test_text_to_uri(self):
790        for oldvar, newvar in self._conf_mapping.items():
791            self._pckdict[str(oldvar)] = b'TEST VALUE'
792            import_config_pck(self._mlist, self._pckdict)
793            text = decorate(newvar, self._mlist)
794            self.assertEqual(
795                text, 'TEST VALUE',
796                'Old variable %s was not properly imported to %s'
797                % (oldvar, newvar))
798
799    def test_substitutions(self):
800        test_text = ('UNIT TESTING %(real_name)s mailing list\n'
801                     '%(real_name)s@%(host_name)s')
802        expected_text = ('UNIT TESTING $display_name mailing list '
803                         '-- $listname\n'
804                         'To unsubscribe send an email to '
805                         '${short_listname}-leave@${domain}')
806        for oldvar, newvar in self._conf_mapping.items():
807            self._pckdict[str(oldvar)] = str(test_text)
808            import_config_pck(self._mlist, self._pckdict)
809            text = getUtility(ITemplateLoader).get(newvar, self._mlist)
810            self.assertEqual(
811                text, expected_text,
812                'Old variables were not converted for %s' % newvar)
813
814    def test_keep_default(self):
815        # If the value was not changed from MM2.1's default, don't import it.
816        default_msg_footer = (
817            '_______________________________________________\n'
818            '%(real_name)s mailing list\n'
819            '%(real_name)s@%(host_name)s\n'
820            '%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s\n'
821            )
822        loader = getUtility(ITemplateLoader)
823        for oldvar in ('msg_footer', 'digest_footer'):
824            newvar = self._conf_mapping[oldvar]
825            self._pckdict[str(oldvar)] = str(default_msg_footer)
826            try:
827                old_value = loader.get(newvar, self._mlist)
828            except URLError:
829                old_value = None
830            import_config_pck(self._mlist, self._pckdict)
831            try:
832                new_value = loader.get(newvar, self._mlist)
833            except URLError:
834                new_value = None
835            self.assertEqual(
836                old_value, new_value,
837                '{} changed unexpectedly: {} != {}'.format(
838                    newvar, old_value, new_value))
839
840    def test_keep_default_if_fqdn_changed(self):
841        # Use case: importing the old a@ex.com into b@ex.com.  We can't check
842        # if it changed from the default so don't import.  We may do more harm
843        # than good and it's easy to change if needed.
844        test_value = b'TEST-VALUE'
845        # We need an IDomain for this mail_host.
846        getUtility(IDomainManager).add('test.example.com')
847        manager = getUtility(ITemplateManager)
848        for oldvar, newvar in self._conf_mapping.items():
849            self._mlist.mail_host = 'example.com'
850            self._pckdict['mail_host'] = b'test.example.com'
851            self._pckdict[str(oldvar)] = test_value
852            try:
853                old_value = manager.get(newvar, 'blank.example.com')
854            except URLError:
855                old_value = None
856            # Suppress warning messages in the test output.
857            with mock.patch('sys.stderr'):
858                import_config_pck(self._mlist, self._pckdict)
859            try:
860                new_value = manager.get(newvar, 'test.example.com')
861            except URLError:
862                new_value = None
863            self.assertEqual(
864                old_value, new_value,
865                '{} changed unexpectedly: {} != {}'.format(
866                    newvar, old_value, new_value))
867
868    def test_unicode(self):
869        # non-ascii templates
870        for oldvar in self._conf_mapping:
871            self._pckdict[str(oldvar)] = b'Ol\xe1!'
872        import_config_pck(self._mlist, self._pckdict)
873        for oldvar, newvar in self._conf_mapping.items():
874            text = decorate(newvar, self._mlist)
875            expected = u'Ol\ufffd!'
876            self.assertEqual(
877                text, expected,
878                '{} -> {} did not get converted'.format(oldvar, newvar))
879
880    def test_unicode_in_default(self):
881        # What if the default template is already in UTF-8?   For example, if
882        # you import it twice.
883        footer = b'\xe4\xb8\xad $listinfo_uri'
884        footer_path = os.path.join(
885            config.VAR_DIR, 'templates', 'lists',
886            'blank@example.com', 'en', 'footer.txt')
887        makedirs(os.path.dirname(footer_path))
888        with open(footer_path, 'wb') as fp:
889            fp.write(footer)
890        self._pckdict['msg_footer'] = b'NEW-VALUE'
891        import_config_pck(self._mlist, self._pckdict)
892        text = decorate('list:member:regular:footer', self._mlist)
893        self.assertEqual(text, 'NEW-VALUE')
894
895
896class TestRosterImport(unittest.TestCase):
897    """Test that rosters are imported correctly."""
898
899    layer = ConfigLayer
900
901    def setUp(self):
902        self._mlist = create_list('blank@example.com')
903        self._pckdict = {
904            'members': {
905                'anne@example.com': 0,
906                'bob@example.com': b'bob@ExampLe.Com',
907                },
908            'digest_members': {
909                'cindy@example.com': 0,
910                'dave@example.com': b'dave@ExampLe.Com',
911                },
912            'passwords': {
913                'anne@example.com': b'annepass',
914                'bob@example.com': b'bobpass',
915                'cindy@example.com': b'cindypass',
916                'dave@example.com': b'davepass',
917                },
918            'language': {
919                'anne@example.com': b'fr',
920                'bob@example.com': b'de',
921                'cindy@example.com': b'es',
922                'dave@example.com': b'it',
923                },
924            # Usernames are unicode strings in the pickle
925            'usernames': {
926                'anne@example.com': 'Anne',
927                'bob@example.com': 'Bob',
928                'cindy@example.com': 'Cindy',
929                'dave@example.com': 'Dave',
930                },
931            'owner': [
932                'anne@example.com',
933                'emily@example.com',
934                ],
935            'moderator': [
936                'bob@example.com',
937                'fred@example.com',
938                ],
939            'accept_these_nonmembers': [
940                'gene@example.com',
941                '^gene-.*@example.com',
942                ],
943            'hold_these_nonmembers': [
944                'homer@example.com',
945                '^homer-.*@example.com',
946                ],
947            'reject_these_nonmembers': [
948                'iris@example.com',
949                '^iris-.*@example.com',
950                ],
951            'discard_these_nonmembers': [
952                'kenny@example.com',
953                '^kenny-.*@example.com',
954                ],
955            }
956        self._usermanager = getUtility(IUserManager)
957        language_manager = getUtility(ILanguageManager)
958        for code in self._pckdict['language'].values():
959            if isinstance(code, bytes):
960                code = code.decode('utf-8')
961            if code not in language_manager.codes:
962                language_manager.add(code, 'utf-8', code)
963
964    def test_member(self):
965        import_config_pck(self._mlist, self._pckdict)
966        for name in ('anne', 'bob', 'cindy', 'dave'):
967            addr = '%s@example.com' % name
968            self.assertIn(addr,
969                          [a.email for a in self._mlist.members.addresses],
970                          'Address %s was not imported' % addr)
971        self.assertIn('anne@example.com',
972                      [a.email for a in self._mlist.regular_members.addresses])
973        self.assertIn('bob@example.com',
974                      [a.email for a in self._mlist.regular_members.addresses])
975        self.assertIn('cindy@example.com',
976                      [a.email for a in self._mlist.digest_members.addresses])
977        self.assertIn('dave@example.com',
978                      [a.email for a in self._mlist.digest_members.addresses])
979
980    def test_original_email(self):
981        import_config_pck(self._mlist, self._pckdict)
982        bob = self._usermanager.get_address('bob@example.com')
983        self.assertEqual(bob.original_email, 'bob@ExampLe.Com')
984        dave = self._usermanager.get_address('dave@example.com')
985        self.assertEqual(dave.original_email, 'dave@ExampLe.Com')
986
987    def test_language(self):
988        import_config_pck(self._mlist, self._pckdict)
989        for name in ('anne', 'bob', 'cindy', 'dave'):
990            addr = '%s@example.com' % name
991            member = self._mlist.members.get_member(addr)
992            self.assertIsNotNone(member, 'Address %s was not imported' % addr)
993            code = self._pckdict['language'][addr]
994            if isinstance(code, bytes):
995                code = code.decode('utf-8')
996            self.assertEqual(member.preferred_language.code, code)
997
998    def test_new_language(self):
999        self._pckdict['language']['anne@example.com'] = b'xx_XX'
1000        try:
1001            import_config_pck(self._mlist, self._pckdict)
1002        except Import21Error as error:
1003            self.assertIn('[language.xx_XX]', str(error))
1004        else:
1005            self.fail('Import21Error was not raised')
1006
1007    def test_username(self):
1008        import_config_pck(self._mlist, self._pckdict)
1009        for name in ('anne', 'bob', 'cindy', 'dave'):
1010            addr = '%s@example.com' % name
1011            user = self._usermanager.get_user(addr)
1012            address = self._usermanager.get_address(addr)
1013            self.assertIsNotNone(user, 'User %s was not imported' % addr)
1014            self.assertIsNotNone(address, 'Address %s was not imported' % addr)
1015            display_name = self._pckdict['usernames'][addr]
1016            self.assertEqual(
1017                user.display_name, display_name,
1018                'The display name was not set for User %s' % addr)
1019            self.assertEqual(
1020                address.display_name, display_name,
1021                'The display name was not set for Address %s' % addr)
1022
1023    def test_owner(self):
1024        import_config_pck(self._mlist, self._pckdict)
1025        for name in ('anne', 'emily'):
1026            addr = '%s@example.com' % name
1027            self.assertIn(addr,
1028                          [a.email for a in self._mlist.owners.addresses],
1029                          'Address %s was not imported as owner' % addr)
1030        self.assertNotIn(
1031            'emily@example.com',
1032            [a.email for a in self._mlist.members.addresses],
1033            'Address emily@ was wrongly added to the members list')
1034
1035    def test_moderator(self):
1036        import_config_pck(self._mlist, self._pckdict)
1037        for name in ('bob', 'fred'):
1038            addr = '%s@example.com' % name
1039            self.assertIn(addr,
1040                          [a.email for a in self._mlist.moderators.addresses],
1041                          'Address %s was not imported as moderator' % addr)
1042        self.assertNotIn('fred@example.com',
1043                         [a.email for a in self._mlist.members.addresses],
1044                         'Address fred@ was wrongly added to the members list')
1045
1046    # Commented out because password importing has been disabled.
1047    # def test_password(self):
1048    #     # self.anne.password = config.password_context.encrypt('abc123')
1049    #     import_config_pck(self._mlist, self._pckdict)
1050    #     for name in ('anne', 'bob', 'cindy', 'dave'):
1051    #         addr = '%s@example.com' % name
1052    #         user = self._usermanager.get_user(addr)
1053    #         self.assertIsNotNone(user, 'Address %s was not imported' % addr)
1054    #         self.assertEqual(
1055    #             user.password, '{plaintext}%spass' % name,
1056    #             'Password for %s was not imported' % addr)
1057
1058    def test_same_user(self):
1059        # Adding the address of an existing User must not create another user.
1060        user = self._usermanager.create_user('anne@example.com', 'Anne')
1061        user.register('bob@example.com')                   # secondary email
1062        import_config_pck(self._mlist, self._pckdict)
1063        member = self._mlist.members.get_member('bob@example.com')
1064        self.assertEqual(member.user, user)
1065
1066    def test_owner_and_moderator_not_lowercase(self):
1067        # In the v2.1 pickled dict, the owner and moderator lists are not
1068        # necessarily lowercased already.
1069        self._pckdict['owner'] = [b'Anne@example.com']
1070        self._pckdict['moderator'] = [b'Anne@example.com']
1071        import_config_pck(self._mlist, self._pckdict)
1072        self.assertIn('anne@example.com',
1073                      [a.email for a in self._mlist.owners.addresses])
1074        self.assertIn('anne@example.com',
1075                      [a.email for a in self._mlist.moderators.addresses])
1076
1077    def test_address_already_exists_but_no_user(self):
1078        # An address already exists, but it is not linked to a user nor
1079        # subscribed.
1080        anne_addr = self._usermanager.create_address(
1081            'anne@example.com', 'Anne')
1082        import_config_pck(self._mlist, self._pckdict)
1083        anne = self._usermanager.get_user('anne@example.com')
1084        self.assertTrue(anne.controls('anne@example.com'))
1085        self.assertIn(anne_addr, self._mlist.regular_members.addresses)
1086
1087    def test_address_already_subscribed_but_no_user(self):
1088        # An address is already subscribed, but it is not linked to a user.
1089        anne_addr = self._usermanager.create_address(
1090            'anne@example.com', 'Anne')
1091        self._mlist.subscribe(anne_addr)
1092        # Suppress warning messages in test output.
1093        with mock.patch('sys.stderr'):
1094            import_config_pck(self._mlist, self._pckdict)
1095        anne = self._usermanager.get_user('anne@example.com')
1096        self.assertTrue(anne.controls('anne@example.com'))
1097
1098    def test_invalid_original_email(self):
1099        # When the member has an original email address (i.e. the
1100        # case-preserved version) that is invalid, their new address record's
1101        # original_email attribute will only be the case insensitive version.
1102        self._pckdict['members']['anne@example.com'] = b'invalid email address'
1103        try:
1104            import_config_pck(self._mlist, self._pckdict)
1105        except InvalidEmailAddressError as error:
1106            self.fail(error)
1107        self.assertIn('anne@example.com',
1108                      [a.email for a in self._mlist.members.addresses])
1109        anne = self._usermanager.get_address('anne@example.com')
1110        self.assertEqual(anne.original_email, 'anne@example.com')
1111
1112    def test_invalid_email(self):
1113        # When a member's email address is invalid, that member is skipped
1114        # during the import.
1115        self._pckdict['members'] = {
1116            'anne@example.com': 0,
1117            'invalid email address': b'invalid email address'
1118            }
1119        self._pckdict['digest_members'] = {}
1120        try:
1121            import_config_pck(self._mlist, self._pckdict)
1122        except InvalidEmailAddressError as error:
1123            self.fail(error)
1124        self.assertEqual(['anne@example.com'],
1125                         [a.email for a in self._mlist.members.addresses])
1126
1127    def test_no_email_sent(self):
1128        # No welcome message is sent to newly imported members.
1129        self.assertTrue(self._mlist.send_welcome_message)
1130        import_config_pck(self._mlist, self._pckdict)
1131        self.assertIn('anne@example.com',
1132                      [a.email for a in self._mlist.members.addresses])
1133        # There are no messages in any of the queues.
1134        for queue, switchboard in config.switchboards.items():
1135            file_count = len(switchboard.files)
1136            self.assertEqual(file_count, 0,
1137                             "Unexpected queue '{}' file count: {}".format(
1138                                 queue, file_count))
1139        self.assertTrue(self._mlist.send_welcome_message)
1140
1141    def test_nonmembers(self):
1142        import_config_pck(self._mlist, self._pckdict)
1143        expected = {
1144            'gene': Action.defer,
1145            'homer': Action.hold,
1146            'iris': Action.reject,
1147            'kenny': Action.discard,
1148            }
1149        for name, action in expected.items():
1150            self.assertIn('{}@example.com'.format(name),
1151                          [a.email for a in self._mlist.nonmembers.addresses],
1152                          'Address {} was not imported'.format(name))
1153            member = self._mlist.nonmembers.get_member(
1154                '{}@example.com'.format(name))
1155            self.assertEqual(member.moderation_action, action)
1156            # Action.defer maps from accept; map it back to get the name.
1157            if action == Action.defer:
1158                action = Action.accept
1159            # Only regexps should remain in the list property.
1160            list_prop = getattr(
1161                self._mlist,
1162                '{}_these_nonmembers'.format(action.name))
1163            self.assertEqual(len(list_prop), 1)
1164            self.assertTrue(all(addr.startswith('^') for addr in list_prop))
1165
1166    def test_nonmember_following_member(self):
1167        self._pckdict['hold_these_nonmembers'] = [
1168            'linda@example.com',
1169            'homer@example.com',
1170            ]
1171        self._pckdict['members']['linda@example.com'] = 0
1172        self._pckdict['user_options'] = {'linda@example.com': 1}
1173        import_config_pck(self._mlist, self._pckdict)
1174        member = self._mlist.nonmembers.get_member('linda@example.com')
1175        self.assertEqual(member.moderation_action, Action.defer)
1176        member = self._mlist.nonmembers.get_member('homer@example.com')
1177        self.assertEqual(member.moderation_action, Action.hold)
1178
1179    def test_no_import_banned_address(self):
1180        # Banned addresses should not be imported with any role.
1181        self._pckdict['ban_list'] = [b'^.*example.com']
1182        import_config_pck(self._mlist, self._pckdict)
1183        self.assertEqual([], list(self._mlist.owners.addresses))
1184        self.assertEqual([], list(self._mlist.moderators.addresses))
1185        self.assertEqual([], list(self._mlist.members.addresses))
1186        self.assertEqual([], list(self._mlist.nonmembers.addresses))
1187
1188
1189class TestRosterVisibilityImport(unittest.TestCase):
1190    """Test that member_roster_visibility is imported correctly.
1191
1192    Mailman 2.1 lists have a private_roster attribute to control roster
1193    visibility with values 0==public, 1==members, 2==admins
1194    These correspond to the Mailman 3 member_roster_visibility values
1195    RosterVisibility.public, RosterVisibility.members and
1196    RosterVisibility.moderators
1197    """
1198    layer = ConfigLayer
1199
1200    def setUp(self):
1201        self._mlist = create_list('blank@example.com')
1202        self._mlist.member_roster_visibility = DummyEnum.val
1203
1204    def _do_test(self, original, expected):
1205        import_config_pck(self._mlist, dict(private_roster=original))
1206        self.assertEqual(self._mlist.member_roster_visibility, expected)
1207
1208    def test_roster_visibility_public(self):
1209        self._do_test(0, RosterVisibility.public)
1210
1211    def test_roster_visibility_members(self):
1212        self._do_test(1, RosterVisibility.members)
1213
1214    def test_roster_visibility_moderators(self):
1215        self._do_test(2, RosterVisibility.moderators)
1216
1217    def test_roster_visibility_bad(self):
1218        self._do_test(3, DummyEnum.val)
1219
1220
1221class TestPreferencesImport(unittest.TestCase):
1222    """Preferences get imported too."""
1223
1224    layer = ConfigLayer
1225
1226    def setUp(self):
1227        self._mlist = create_list('blank@example.com')
1228        self._pckdict = dict(
1229            members={'anne@example.com': 0},
1230            user_options=dict(),
1231            delivery_status=dict(),
1232            )
1233        self._usermanager = getUtility(IUserManager)
1234
1235    def _do_test(self, oldvalue, expected):
1236        self._pckdict['user_options']['anne@example.com'] = oldvalue
1237        import_config_pck(self._mlist, self._pckdict)
1238        user = self._usermanager.get_user('anne@example.com')
1239        self.assertIsNotNone(user, 'User was not imported')
1240        member = self._mlist.members.get_member('anne@example.com')
1241        self.assertIsNotNone(member, 'Address was not subscribed')
1242        for exp_name, exp_val in expected.items():
1243            try:
1244                currentval = getattr(member, exp_name)
1245            except AttributeError:
1246                # hide_address has no direct getter
1247                currentval = getattr(member.preferences, exp_name)
1248            self.assertEqual(
1249                currentval, exp_val,
1250                'Preference %s was not imported' % exp_name)
1251        # XXX: should I check that other params are still equal to
1252        # mailman.core.constants.system_preferences?
1253
1254    def test_acknowledge_posts(self):
1255        # AcknowledgePosts
1256        self._do_test(4, dict(acknowledge_posts=True))
1257
1258    def test_hide_address(self):
1259        # ConcealSubscription
1260        self._do_test(16, dict(hide_address=True))
1261
1262    def test_receive_own_postings(self):
1263        # DontReceiveOwnPosts
1264        self._do_test(2, dict(receive_own_postings=False))
1265
1266    def test_receive_list_copy(self):
1267        # DontReceiveDuplicates
1268        self._do_test(256, dict(receive_list_copy=False))
1269
1270    def test_digest_plain(self):
1271        # Digests & DisableMime
1272        self._pckdict['digest_members'] = self._pckdict['members'].copy()
1273        self._pckdict['members'] = dict()
1274        self._do_test(8, dict(delivery_mode=DeliveryMode.plaintext_digests))
1275
1276    def test_digest_mime(self):
1277        # Digests & not DisableMime
1278        self._pckdict['digest_members'] = self._pckdict['members'].copy()
1279        self._pckdict['members'] = dict()
1280        self._do_test(0, dict(delivery_mode=DeliveryMode.mime_digests))
1281
1282    def test_delivery_status(self):
1283        # Look for the pckdict['delivery_status'] key which will look like
1284        # (status, time) where status is among the following:
1285        # ENABLED  = 0 # enabled
1286        # UNKNOWN  = 1 # legacy disabled
1287        # BYUSER   = 2 # disabled by user choice
1288        # BYADMIN  = 3 # disabled by admin choice
1289        # BYBOUNCE = 4 # disabled by bounces
1290        for oldval, expected in enumerate((
1291                DeliveryStatus.enabled,
1292                DeliveryStatus.unknown, DeliveryStatus.by_user,
1293                DeliveryStatus.by_moderator, DeliveryStatus.by_bounces)):
1294            self._pckdict['delivery_status']['anne@example.com'] = (oldval, 0)
1295            import_config_pck(self._mlist, self._pckdict)
1296            member = self._mlist.members.get_member('anne@example.com')
1297            self.assertIsNotNone(member, 'Address was not subscribed')
1298            self.assertEqual(member.delivery_status, expected)
1299            member.unsubscribe()
1300
1301    def test_moderate_hold(self):
1302        # Option flag Moderate is translated to the action set in
1303        # member_moderation_action.
1304        self._pckdict['member_moderation_action'] = 0
1305        self._do_test(128, dict(moderation_action=Action.hold))
1306
1307    def test_moderate_reject(self):
1308        # Option flag Moderate is translated to the action set in
1309        # member_moderation_action.
1310        self._pckdict['member_moderation_action'] = 1
1311        self._do_test(128, dict(moderation_action=Action.reject))
1312
1313    def test_moderate_hold_discard(self):
1314        # Option flag Moderate is translated to the action set in
1315        # member_moderation_action.
1316        self._pckdict['member_moderation_action'] = 2
1317        self._do_test(128, dict(moderation_action=Action.discard))
1318
1319    def test_no_moderate(self):
1320        # If the option flag Moderate is not set, the action is defer.
1321        # See: https://gitlab.com/mailman/mailman/merge_requests/100
1322        self._pckdict['member_moderation_action'] = 1          # reject
1323        self._do_test(0, dict(moderation_action=Action.defer))
1324
1325    def test_multiple_options(self):
1326        # DontReceiveDuplicates & DisableMime & SuppressPasswordReminder
1327        # Keys might be Python 2 str/bytes or unicode.
1328        members = self._pckdict['members']
1329        self._pckdict['digest_members'] = members.copy()
1330        self._pckdict['members'] = dict()
1331        self._do_test(296, dict(
1332                receive_list_copy=False,
1333                delivery_mode=DeliveryMode.plaintext_digests,
1334                ))
1335
1336    def test_language_code_none(self):
1337        self.assertIsNone(check_language_code(None))
1338