1# Copyright (C) 2011-2020 by the Free Software Foundation, Inc. 2# 3# This file is part of GNU Mailman. 4# 5# GNU Mailman is free software: you can redistribute it and/or modify it under 6# the terms of the GNU General Public License as published by the Free 7# Software Foundation, either version 3 of the License, or (at your option) 8# any later version. 9# 10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT 11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 13# more details. 14# 15# You should have received a copy of the GNU General Public License along with 16# GNU Mailman. If not, see <https://www.gnu.org/licenses/>. 17 18"""Additional tests for the hold chain.""" 19 20import unittest 21 22from email import message_from_bytes as mfb 23from importlib_resources import read_binary 24from mailman.app.lifecycle import create_list 25from mailman.chains.builtin import BuiltInChain 26from mailman.chains.hold import HoldChain, autorespond_to_sender 27from mailman.core.chains import process as process_chain 28from mailman.interfaces.autorespond import IAutoResponseSet, Response 29from mailman.interfaces.member import MemberRole 30from mailman.interfaces.messages import IMessageStore 31from mailman.interfaces.requests import IListRequests, RequestType 32from mailman.interfaces.usermanager import IUserManager 33from mailman.testing.helpers import ( 34 LogFileMark, configuration, get_queue_messages, set_preferred, 35 specialized_message_from_string as mfs) 36from mailman.testing.layers import ConfigLayer 37from zope.component import getUtility 38 39 40SEMISPACE = '; ' 41 42 43class TestAutorespond(unittest.TestCase): 44 """Test autorespond_to_sender()""" 45 46 layer = ConfigLayer 47 maxDiff = None 48 49 def setUp(self): 50 self._mlist = create_list('test@example.com') 51 52 @configuration('mta', max_autoresponses_per_day=1) 53 def test_max_autoresponses_per_day(self): 54 # The last one we sent was the last one we should send today. Instead 55 # of sending an automatic response, send them the "no more today" 56 # message. Start by simulating a response having been sent to an 57 # address already. 58 anne = getUtility(IUserManager).create_address('anne@example.com') 59 response_set = IAutoResponseSet(self._mlist) 60 response_set.response_sent(anne, Response.hold) 61 # Trigger the sending of a "last response for today" using the default 62 # language (i.e. the mailing list's preferred language). 63 autorespond_to_sender(self._mlist, 'anne@example.com') 64 # So first, there should be one more hold response sent to the user. 65 self.assertEqual(response_set.todays_count(anne, Response.hold), 2) 66 # And the virgin queue should have the message in it. 67 messages = get_queue_messages('virgin') 68 self.assertEqual(len(messages), 1) 69 # Remove the variable headers. 70 message = messages[0].msg 71 self.assertIn('message-id', message) 72 del message['message-id'] 73 self.assertIn('date', message) 74 del message['date'] 75 self.assertMultiLineEqual(messages[0].msg.as_string(), """\ 76MIME-Version: 1.0 77Content-Type: text/plain; charset="us-ascii" 78Content-Transfer-Encoding: 7bit 79Subject: Last autoresponse notification for today 80From: test-owner@example.com 81To: anne@example.com 82Precedence: bulk 83 84We have received a message from your address <anne@example.com> 85requesting an automated response from the test@example.com mailing 86list. 87 88The number we have seen today: 1. In order to avoid problems such as 89mail loops between email robots, we will not be sending you any 90further responses today. Please try again tomorrow. 91 92If you believe this message is in error, or if you have any questions, 93please contact the list owner at test-owner@example.com.""") 94 95 96class TestHoldChain(unittest.TestCase): 97 """Test the hold chain code.""" 98 99 layer = ConfigLayer 100 101 def setUp(self): 102 self._mlist = create_list('test@example.com') 103 self._user_manager = getUtility(IUserManager) 104 105 def test_hold_chain(self): 106 msg = mfs("""\ 107From: anne@example.com 108To: test@example.com 109Subject: A message 110Message-ID: <ant> 111MIME-Version: 1.0 112 113A message body. 114""") 115 msgdata = dict(moderation_reasons=[ 116 'TEST-REASON-1', 117 'TEST-REASON-2', 118 ('TEST-{}-REASON-{}', 'FORMAT', 3), 119 ]) 120 logfile = LogFileMark('mailman.vette') 121 process_chain(self._mlist, msg, msgdata, start_chain='hold') 122 messages = get_queue_messages('virgin', expected_count=2) 123 payloads = {} 124 for item in messages: 125 if item.msg['to'] == 'test-owner@example.com': 126 part = item.msg.get_payload(0) 127 payloads['owner'] = part.get_payload().splitlines() 128 elif item.msg['To'] == 'anne@example.com': 129 payloads['sender'] = item.msg.get_payload().splitlines() 130 else: 131 self.fail('Unexpected message: %s' % item.msg) 132 self.assertIn(' TEST-REASON-1', payloads['owner']) 133 self.assertIn(' TEST-REASON-2', payloads['owner']) 134 self.assertIn(' TEST-FORMAT-REASON-3', payloads['owner']) 135 self.assertIn(' TEST-REASON-1', payloads['sender']) 136 self.assertIn(' TEST-REASON-2', payloads['sender']) 137 self.assertIn(' TEST-FORMAT-REASON-3', payloads['sender']) 138 logged = logfile.read() 139 self.assertIn('TEST-REASON-1', logged) 140 self.assertIn('TEST-REASON-2', logged) 141 self.assertIn('TEST-FORMAT-REASON-3', logged) 142 # Check the reason passed to hold_message(). 143 requests = IListRequests(self._mlist) 144 self.assertEqual(requests.count_of(RequestType.held_message), 1) 145 request = requests.of_type(RequestType.held_message)[0] 146 key, data = requests.get_request(request.id) 147 self.assertEqual( 148 data.get('_mod_reason'), 149 'TEST-REASON-1; TEST-REASON-2; TEST-FORMAT-REASON-3') 150 151 def test_hold_chain_no_reasons_given(self): 152 msg = mfs("""\ 153From: anne@example.com 154To: test@example.com 155Subject: A message 156Message-ID: <ant> 157MIME-Version: 1.0 158 159A message body. 160""") 161 process_chain(self._mlist, msg, {}, start_chain='hold') 162 # No reason was given, so a default is used. 163 requests = IListRequests(self._mlist) 164 self.assertEqual(requests.count_of(RequestType.held_message), 1) 165 request = requests.of_type(RequestType.held_message)[0] 166 key, data = requests.get_request(request.id) 167 self.assertEqual(data.get('_mod_reason'), 'n/a') 168 169 def test_hold_chain_charset(self): 170 # Issue #144 - UnicodeEncodeError in the hold chain. 171 self._mlist.admin_immed_notify = True 172 self._mlist.respond_to_post_requests = False 173 bart = self._user_manager.create_user('bart@example.com', 'Bart User') 174 address = set_preferred(bart) 175 self._mlist.subscribe(address, MemberRole.moderator) 176 msg = mfb(read_binary('mailman.chains.tests', 'issue144.eml')) 177 msg.sender = 'anne@example.com' 178 process_chain(self._mlist, msg, {}, start_chain='hold') 179 # The postauth.txt message is now in the virgin queue awaiting 180 # delivery to the moderators. 181 items = get_queue_messages('virgin', expected_count=1) 182 msgdata = items[0].msgdata 183 # Should get sent to -owner address. 184 self.assertEqual(msgdata['recipients'], {'test-owner@example.com'}) 185 # Ensure that the subject looks correct in the postauth.txt. 186 msg = items[0].msg 187 value = None 188 for line in msg.get_payload(0).get_payload().splitlines(): 189 if line.strip().startswith('Subject:'): 190 header, colon, value = line.partition(':') 191 break 192 self.assertEqual(value.lstrip(), 'Vi?enamjenski pi?tolj za vodu 8/1') 193 self.assertEqual( 194 msg['Subject'], 195 'test@example.com post from anne@example.com requires approval') 196 197 def test_hold_chain_crosspost(self): 198 mlist2 = create_list('test2@example.com') 199 msg = mfs("""\ 200From: anne@example.com 201To: test@example.com, test2@example.com 202Subject: A message 203Message-ID: <ant> 204MIME-Version: 1.0 205 206A message body. 207""") 208 process_chain(self._mlist, msg, {}, start_chain='hold') 209 process_chain(mlist2, msg, {}, start_chain='hold') 210 # There are four items in the virgin queue. Two of them are for the 211 # list owners who need to moderate the held message, and the other is 212 # for anne telling her that her message was held for approval. 213 items = get_queue_messages('virgin', expected_count=4) 214 anne_froms = set() 215 owner_tos = set() 216 for item in items: 217 if item.msg['to'] == 'anne@example.com': 218 anne_froms.add(item.msg['from']) 219 else: 220 owner_tos.add(item.msg['to']) 221 self.assertEqual(anne_froms, set(['test-bounces@example.com', 222 'test2-bounces@example.com'])) 223 self.assertEqual(owner_tos, set(['test-owner@example.com', 224 'test2-owner@example.com'])) 225 # And the message appears in the store. 226 messages = list(getUtility(IMessageStore).messages) 227 self.assertEqual(len(messages), 1) 228 self.assertEqual(messages[0]['message-id'], '<ant>') 229 230 def test_hold_with_long_rule_misses(self): 231 msg = mfs("""\ 232From: anne@example.com 233To: test@example.com 234Subject: A message 235Message-ID: <ant> 236MIME-Version: 1.0 237 238A message body. 239""") 240 rule_misses = [x[0] for x in BuiltInChain._link_descriptions 241 if x[0] not in ('truth', 'any')] 242 for i in range(20): 243 rule_misses.append('header-match-test.example.com-{}'.format(i)) 244 msgdata = dict(rule_misses=rule_misses) 245 msgdata['rule_hits'] = [] 246 msgdata['moderation_reasons'] = ['something'] 247 # We can't use process_chain because it clears rule hits and misses. 248 HoldChain()._process(self._mlist, msg, msgdata) 249 messages = get_queue_messages('virgin', expected_count=2) 250 for item in messages: 251 if item.msg['to'] == 'test-owner@example.com': 252 held_message = item.msg.get_payload(1).get_payload(0) 253 elif item.msg['To'] == 'anne@example.com': 254 pass 255 else: 256 self.fail('Unexpected message: %s' % item.msg) 257 self.assertEqual(held_message['x-mailman-rule-misses'], 258 SEMISPACE.join(rule_misses)) 259