1# Copyright (C) 2011-2020 by the Free Software Foundation, Inc.
2#
3# This file is part of GNU Mailman.
4#
5# GNU Mailman is free software: you can redistribute it and/or modify it under
6# the terms of the GNU General Public License as published by the Free
7# Software Foundation, either version 3 of the License, or (at your option)
8# any later version.
9#
10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13# more details.
14#
15# You should have received a copy of the GNU General Public License along with
16# GNU Mailman.  If not, see <https://www.gnu.org/licenses/>.
17
18"""Additional tests for the hold chain."""
19
20import unittest
21
22from email import message_from_bytes as mfb
23from importlib_resources import read_binary
24from mailman.app.lifecycle import create_list
25from mailman.chains.builtin import BuiltInChain
26from mailman.chains.hold import HoldChain, autorespond_to_sender
27from mailman.core.chains import process as process_chain
28from mailman.interfaces.autorespond import IAutoResponseSet, Response
29from mailman.interfaces.member import MemberRole
30from mailman.interfaces.messages import IMessageStore
31from mailman.interfaces.requests import IListRequests, RequestType
32from mailman.interfaces.usermanager import IUserManager
33from mailman.testing.helpers import (
34    LogFileMark, configuration, get_queue_messages, set_preferred,
35    specialized_message_from_string as mfs)
36from mailman.testing.layers import ConfigLayer
37from zope.component import getUtility
38
39
40SEMISPACE = '; '
41
42
43class TestAutorespond(unittest.TestCase):
44    """Test autorespond_to_sender()"""
45
46    layer = ConfigLayer
47    maxDiff = None
48
49    def setUp(self):
50        self._mlist = create_list('test@example.com')
51
52    @configuration('mta', max_autoresponses_per_day=1)
53    def test_max_autoresponses_per_day(self):
54        # The last one we sent was the last one we should send today.  Instead
55        # of sending an automatic response, send them the "no more today"
56        # message.  Start by simulating a response having been sent to an
57        # address already.
58        anne = getUtility(IUserManager).create_address('anne@example.com')
59        response_set = IAutoResponseSet(self._mlist)
60        response_set.response_sent(anne, Response.hold)
61        # Trigger the sending of a "last response for today" using the default
62        # language (i.e. the mailing list's preferred language).
63        autorespond_to_sender(self._mlist, 'anne@example.com')
64        # So first, there should be one more hold response sent to the user.
65        self.assertEqual(response_set.todays_count(anne, Response.hold), 2)
66        # And the virgin queue should have the message in it.
67        messages = get_queue_messages('virgin')
68        self.assertEqual(len(messages), 1)
69        # Remove the variable headers.
70        message = messages[0].msg
71        self.assertIn('message-id', message)
72        del message['message-id']
73        self.assertIn('date', message)
74        del message['date']
75        self.assertMultiLineEqual(messages[0].msg.as_string(), """\
76MIME-Version: 1.0
77Content-Type: text/plain; charset="us-ascii"
78Content-Transfer-Encoding: 7bit
79Subject: Last autoresponse notification for today
80From: test-owner@example.com
81To: anne@example.com
82Precedence: bulk
83
84We have received a message from your address <anne@example.com>
85requesting an automated response from the test@example.com mailing
86list.
87
88The number we have seen today: 1.  In order to avoid problems such as
89mail loops between email robots, we will not be sending you any
90further responses today.  Please try again tomorrow.
91
92If you believe this message is in error, or if you have any questions,
93please contact the list owner at test-owner@example.com.""")
94
95
96class TestHoldChain(unittest.TestCase):
97    """Test the hold chain code."""
98
99    layer = ConfigLayer
100
101    def setUp(self):
102        self._mlist = create_list('test@example.com')
103        self._user_manager = getUtility(IUserManager)
104
105    def test_hold_chain(self):
106        msg = mfs("""\
107From: anne@example.com
108To: test@example.com
109Subject: A message
110Message-ID: <ant>
111MIME-Version: 1.0
112
113A message body.
114""")
115        msgdata = dict(moderation_reasons=[
116            'TEST-REASON-1',
117            'TEST-REASON-2',
118            ('TEST-{}-REASON-{}', 'FORMAT', 3),
119            ])
120        logfile = LogFileMark('mailman.vette')
121        process_chain(self._mlist, msg, msgdata, start_chain='hold')
122        messages = get_queue_messages('virgin', expected_count=2)
123        payloads = {}
124        for item in messages:
125            if item.msg['to'] == 'test-owner@example.com':
126                part = item.msg.get_payload(0)
127                payloads['owner'] = part.get_payload().splitlines()
128            elif item.msg['To'] == 'anne@example.com':
129                payloads['sender'] = item.msg.get_payload().splitlines()
130            else:
131                self.fail('Unexpected message: %s' % item.msg)
132        self.assertIn('    TEST-REASON-1', payloads['owner'])
133        self.assertIn('    TEST-REASON-2', payloads['owner'])
134        self.assertIn('    TEST-FORMAT-REASON-3', payloads['owner'])
135        self.assertIn('    TEST-REASON-1', payloads['sender'])
136        self.assertIn('    TEST-REASON-2', payloads['sender'])
137        self.assertIn('    TEST-FORMAT-REASON-3', payloads['sender'])
138        logged = logfile.read()
139        self.assertIn('TEST-REASON-1', logged)
140        self.assertIn('TEST-REASON-2', logged)
141        self.assertIn('TEST-FORMAT-REASON-3', logged)
142        # Check the reason passed to hold_message().
143        requests = IListRequests(self._mlist)
144        self.assertEqual(requests.count_of(RequestType.held_message), 1)
145        request = requests.of_type(RequestType.held_message)[0]
146        key, data = requests.get_request(request.id)
147        self.assertEqual(
148            data.get('_mod_reason'),
149            'TEST-REASON-1; TEST-REASON-2; TEST-FORMAT-REASON-3')
150
151    def test_hold_chain_no_reasons_given(self):
152        msg = mfs("""\
153From: anne@example.com
154To: test@example.com
155Subject: A message
156Message-ID: <ant>
157MIME-Version: 1.0
158
159A message body.
160""")
161        process_chain(self._mlist, msg, {}, start_chain='hold')
162        # No reason was given, so a default is used.
163        requests = IListRequests(self._mlist)
164        self.assertEqual(requests.count_of(RequestType.held_message), 1)
165        request = requests.of_type(RequestType.held_message)[0]
166        key, data = requests.get_request(request.id)
167        self.assertEqual(data.get('_mod_reason'), 'n/a')
168
169    def test_hold_chain_charset(self):
170        # Issue #144 - UnicodeEncodeError in the hold chain.
171        self._mlist.admin_immed_notify = True
172        self._mlist.respond_to_post_requests = False
173        bart = self._user_manager.create_user('bart@example.com', 'Bart User')
174        address = set_preferred(bart)
175        self._mlist.subscribe(address, MemberRole.moderator)
176        msg = mfb(read_binary('mailman.chains.tests', 'issue144.eml'))
177        msg.sender = 'anne@example.com'
178        process_chain(self._mlist, msg, {}, start_chain='hold')
179        # The postauth.txt message is now in the virgin queue awaiting
180        # delivery to the moderators.
181        items = get_queue_messages('virgin', expected_count=1)
182        msgdata = items[0].msgdata
183        # Should get sent to -owner address.
184        self.assertEqual(msgdata['recipients'], {'test-owner@example.com'})
185        # Ensure that the subject looks correct in the postauth.txt.
186        msg = items[0].msg
187        value = None
188        for line in msg.get_payload(0).get_payload().splitlines():
189            if line.strip().startswith('Subject:'):
190                header, colon, value = line.partition(':')
191                break
192        self.assertEqual(value.lstrip(), 'Vi?enamjenski pi?tolj za vodu 8/1')
193        self.assertEqual(
194            msg['Subject'],
195            'test@example.com post from anne@example.com requires approval')
196
197    def test_hold_chain_crosspost(self):
198        mlist2 = create_list('test2@example.com')
199        msg = mfs("""\
200From: anne@example.com
201To: test@example.com, test2@example.com
202Subject: A message
203Message-ID: <ant>
204MIME-Version: 1.0
205
206A message body.
207""")
208        process_chain(self._mlist, msg, {}, start_chain='hold')
209        process_chain(mlist2, msg, {}, start_chain='hold')
210        # There are four items in the virgin queue.  Two of them are for the
211        # list owners who need to moderate the held message, and the other is
212        # for anne telling her that her message was held for approval.
213        items = get_queue_messages('virgin', expected_count=4)
214        anne_froms = set()
215        owner_tos = set()
216        for item in items:
217            if item.msg['to'] == 'anne@example.com':
218                anne_froms.add(item.msg['from'])
219            else:
220                owner_tos.add(item.msg['to'])
221        self.assertEqual(anne_froms, set(['test-bounces@example.com',
222                                          'test2-bounces@example.com']))
223        self.assertEqual(owner_tos, set(['test-owner@example.com',
224                                         'test2-owner@example.com']))
225        # And the message appears in the store.
226        messages = list(getUtility(IMessageStore).messages)
227        self.assertEqual(len(messages), 1)
228        self.assertEqual(messages[0]['message-id'], '<ant>')
229
230    def test_hold_with_long_rule_misses(self):
231        msg = mfs("""\
232From: anne@example.com
233To: test@example.com
234Subject: A message
235Message-ID: <ant>
236MIME-Version: 1.0
237
238A message body.
239""")
240        rule_misses = [x[0] for x in BuiltInChain._link_descriptions
241                       if x[0] not in ('truth', 'any')]
242        for i in range(20):
243            rule_misses.append('header-match-test.example.com-{}'.format(i))
244        msgdata = dict(rule_misses=rule_misses)
245        msgdata['rule_hits'] = []
246        msgdata['moderation_reasons'] = ['something']
247        # We can't use process_chain because it clears rule hits and misses.
248        HoldChain()._process(self._mlist, msg, msgdata)
249        messages = get_queue_messages('virgin', expected_count=2)
250        for item in messages:
251            if item.msg['to'] == 'test-owner@example.com':
252                held_message = item.msg.get_payload(1).get_payload(0)
253            elif item.msg['To'] == 'anne@example.com':
254                pass
255            else:
256                self.fail('Unexpected message: %s' % item.msg)
257        self.assertEqual(held_message['x-mailman-rule-misses'],
258                         SEMISPACE.join(rule_misses))
259