1# Copyright (C) 2016-2020 by the Free Software Foundation, Inc. 2# 3# This file is part of GNU Mailman. 4# 5# GNU Mailman is free software: you can redistribute it and/or modify it under 6# the terms of the GNU General Public License as published by the Free 7# Software Foundation, either version 3 of the License, or (at your option) 8# any later version. 9# 10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT 11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 13# more details. 14# 15# You should have received a copy of the GNU General Public License along with 16# GNU Mailman. If not, see <https://www.gnu.org/licenses/>. 17 18"""Tests and mocks for gatenews subcommand.""" 19 20import nntplib 21 22from click.testing import CliRunner 23from collections import namedtuple 24from email.errors import MessageError 25from mailman.app.lifecycle import create_list 26from mailman.commands.cli_gatenews import gatenews 27from mailman.config import config 28from mailman.testing.helpers import LogFileMark, get_queue_messages 29from mailman.testing.layers import ConfigLayer 30from unittest import TestCase 31from unittest.mock import patch 32 33 34def get_nntplib_nntp(fail=0): 35 """Create a nntplib.NNTP mock. 36 37 This is used to return predictable responses to a nntplib.NNTP method 38 calls. It can also be called with non-zero values of 'fail' to raise 39 specific exceptions. 40 41 It only implements those classes and attributes used by the gatenews 42 command. 43 """ 44 45 info = namedtuple('info', ('number', 'message_id', 'lines')) 46 47 def make_header(art_num): 48 lines = [b'From: ann@example.com', 49 b'Newsgroups: my.group', 50 b'Subject: A Message', 51 b'To: my.group@news.example.com', 52 ] 53 if art_num == 1: 54 lines.extend([b'Message-ID: <msg1@example.com>', 55 b'List-Id: This is my list on two lines', 56 b' <mylist.example.com>' 57 ]) 58 elif art_num == 2: 59 lines.append(b'Message-ID: <msg2@example.com>') 60 elif art_num == 3: 61 lines.extend([b'Message-ID: <msg3@example.com>', 62 b'List-Id: My list <mylist.example.com>']) 63 return lines 64 65 class NNTP: 66 # The NNTP connection class 67 def __init__(self, host, port=119, user=None, password=None, 68 readermode=None): 69 if fail == 1: 70 raise nntplib.NNTPError('Bad call to NNTP') 71 self.host = host 72 self.port = port 73 self.user = user 74 self.password = password 75 self.readermode = readermode 76 77 def group(self, group_name): 78 if group_name == 'my.group': 79 return('', 3, 1, 3, group_name) 80 else: 81 raise nntplib.NNTPTemporaryError( 82 'No such group: {}'.format(group_name)) 83 84 def head(self, art_num): 85 if art_num not in (1, 2, 3): 86 raise nntplib.NNTPTemporaryError('Bad call to head') 87 lines = make_header(art_num) 88 info.number = art_num 89 info.message_id = '<msg{}@example.com'.format(art_num), 90 info.lines = lines 91 return ('', info) 92 93 def article(self, art_num): 94 if art_num not in (1, 2, 3): 95 raise nntplib.NNTPTemporaryError('Bad call to article') 96 if art_num == 2 and fail == 2: 97 raise nntplib.NNTPTemporaryError('Bad call to article') 98 lines = make_header(art_num) 99 lines.extend([b'', b'This is the message body']) 100 info.number = art_num 101 info.message_id = '<msg{}@example.com'.format(art_num), 102 info.lines = lines 103 return ('', info) 104 105 def quit(self): 106 pass 107 108 patcher = patch('nntplib.NNTP', NNTP) 109 return patcher 110 111 112def get_email_exception(): 113 """Create a mock for email.parser.BytesParser to raise an exception.""" 114 115 class BytesParser: 116 def __init__(self, factory, policy): 117 self.factory = factory 118 self.policy = policy 119 120 def parsebytes(self, msg_bytes): 121 raise MessageError('Bad message') 122 123 patcher = patch('email.parser.BytesParser', BytesParser) 124 return patcher 125 126 127class Test_gatenews(TestCase): 128 """Test gating messages from usenet.""" 129 130 layer = ConfigLayer 131 132 def setUp(self): 133 self._command = CliRunner() 134 config.push('gatenews tests', """\ 135 [nntp] 136 host: news.example.com 137 """) 138 self.addCleanup(config.pop, 'gatenews tests') 139 self.mlist = create_list('mylist@example.com') 140 self.mlist.linked_newsgroup = 'my.group' 141 self.mlist.usenet_watermark = 0 142 self.mlist.gateway_to_mail = True 143 # Create a second list without gateway for test coverage purposes. 144 create_list('otherlist@example.com') 145 146 def test_bad_nntp_connect(self): 147 mark = LogFileMark('mailman.fromusenet') 148 with get_nntplib_nntp(fail=1): 149 self._command.invoke(gatenews) 150 lines = mark.read().splitlines() 151 self.assertEqual(len(lines), 4) 152 self.assertTrue(lines[0].endswith('error opening connection ' 153 'to nntp_host: news.example.com')) 154 self.assertEqual(lines[1], 'Bad call to NNTP') 155 self.assertTrue(lines[2].endswith('NNTP error for list ' 156 'mylist@example.com:')) 157 self.assertEqual(lines[3], 'Bad call to NNTP') 158 159 def test_bad_group(self): 160 self.mlist.linked_newsgroup = 'other.group' 161 mark = LogFileMark('mailman.fromusenet') 162 with get_nntplib_nntp(): 163 self._command.invoke(gatenews) 164 lines = mark.read().splitlines() 165 self.assertEqual(len(lines), 2) 166 self.assertTrue(lines[0].endswith('NNTP error for list ' 167 'mylist@example.com:')) 168 self.assertEqual(lines[1], 'No such group: other.group') 169 170 def test_catchup_only(self): 171 self.mlist.usenet_watermark = None 172 mark = LogFileMark('mailman.fromusenet') 173 with get_nntplib_nntp(): 174 self._command.invoke(gatenews) 175 lines = mark.read().splitlines() 176 self.assertEqual(self.mlist.usenet_watermark, 3) 177 self.assertEqual(len(lines), 3) 178 self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]')) 179 self.assertTrue(lines[1].endswith('mylist@example.com ' 180 'caught up to article 3')) 181 self.assertTrue(lines[2].endswith('mylist@example.com watermark: 3')) 182 183 def test_up_to_date(self): 184 self.mlist.usenet_watermark = 3 185 mark = LogFileMark('mailman.fromusenet') 186 with get_nntplib_nntp(): 187 self._command.invoke(gatenews) 188 lines = mark.read().splitlines() 189 self.assertEqual(self.mlist.usenet_watermark, 3) 190 self.assertEqual(len(lines), 3) 191 self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]')) 192 self.assertTrue(lines[1].endswith('nothing new for list ' 193 'mylist@example.com')) 194 self.assertTrue(lines[2].endswith('mylist@example.com watermark: 3')) 195 196 def test_post_only_one_of_three(self): 197 mark = LogFileMark('mailman.fromusenet') 198 with get_nntplib_nntp(): 199 self._command.invoke(gatenews) 200 lines = mark.read().splitlines() 201 self.assertEqual(self.mlist.usenet_watermark, 3) 202 self.assertEqual(len(lines), 4) 203 self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]')) 204 self.assertTrue(lines[1].endswith('gating mylist@example.com ' 205 'articles [1..3]')) 206 self.assertTrue(lines[2].endswith('posted to list mylist@example.com:' 207 ' 2')) 208 self.assertTrue(lines[3].endswith('mylist@example.com watermark: 3')) 209 items = get_queue_messages('in', expected_count=1) 210 msg = items[0].msg 211 msgdata = items[0].msgdata 212 self.assertTrue(msgdata.get('fromusenet', False)) 213 self.assertEqual(msg.get('message-id', ''), '<msg2@example.com>') 214 215 def test_article_exception(self): 216 mark = LogFileMark('mailman.fromusenet') 217 with get_nntplib_nntp(fail=2): 218 self._command.invoke(gatenews) 219 lines = mark.read().splitlines() 220 self.assertEqual(len(lines), 5) 221 self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]')) 222 self.assertTrue(lines[1].endswith('gating mylist@example.com ' 223 'articles [1..3]')) 224 self.assertTrue(lines[2].endswith('NNTP error for list ' 225 'mylist@example.com: 2')) 226 self.assertEqual(lines[3], 'Bad call to article') 227 self.assertTrue(lines[4].endswith('mylist@example.com watermark: 3')) 228 229 def test_email_parser_exception(self): 230 mark = LogFileMark('mailman.fromusenet') 231 with get_email_exception(): 232 with get_nntplib_nntp(): 233 self._command.invoke(gatenews) 234 lines = mark.read().splitlines() 235 self.assertEqual(len(lines), 5) 236 self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]')) 237 self.assertTrue(lines[1].endswith('gating mylist@example.com ' 238 'articles [1..3]')) 239 self.assertTrue(lines[2].endswith('email package exception for ' 240 'my.group:2')) 241 self.assertEqual(lines[3], 'Bad message') 242 self.assertTrue(lines[4].endswith('mylist@example.com watermark: 3')) 243