1# Copyright (C) 2016-2020 by the Free Software Foundation, Inc.
2#
3# This file is part of GNU Mailman.
4#
5# GNU Mailman is free software: you can redistribute it and/or modify it under
6# the terms of the GNU General Public License as published by the Free
7# Software Foundation, either version 3 of the License, or (at your option)
8# any later version.
9#
10# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13# more details.
14#
15# You should have received a copy of the GNU General Public License along with
16# GNU Mailman.  If not, see <https://www.gnu.org/licenses/>.
17
18"""Tests and mocks for gatenews subcommand."""
19
20import nntplib
21
22from click.testing import CliRunner
23from collections import namedtuple
24from email.errors import MessageError
25from mailman.app.lifecycle import create_list
26from mailman.commands.cli_gatenews import gatenews
27from mailman.config import config
28from mailman.testing.helpers import LogFileMark, get_queue_messages
29from mailman.testing.layers import ConfigLayer
30from unittest import TestCase
31from unittest.mock import patch
32
33
34def get_nntplib_nntp(fail=0):
35    """Create a nntplib.NNTP mock.
36
37    This is used to return predictable responses to a nntplib.NNTP method
38    calls.  It can also be called with non-zero values of 'fail' to raise
39    specific exceptions.
40
41    It only implements those classes and attributes used by the gatenews
42    command.
43    """
44
45    info = namedtuple('info', ('number', 'message_id', 'lines'))
46
47    def make_header(art_num):
48        lines = [b'From: ann@example.com',
49                 b'Newsgroups: my.group',
50                 b'Subject: A Message',
51                 b'To: my.group@news.example.com',
52                 ]
53        if art_num == 1:
54            lines.extend([b'Message-ID: <msg1@example.com>',
55                          b'List-Id: This is my list on two lines',
56                          b' <mylist.example.com>'
57                          ])
58        elif art_num == 2:
59            lines.append(b'Message-ID: <msg2@example.com>')
60        elif art_num == 3:
61            lines.extend([b'Message-ID: <msg3@example.com>',
62                         b'List-Id: My list <mylist.example.com>'])
63        return lines
64
65    class NNTP:
66        # The NNTP connection class
67        def __init__(self, host, port=119, user=None, password=None,
68                     readermode=None):
69            if fail == 1:
70                raise nntplib.NNTPError('Bad call to NNTP')
71            self.host = host
72            self.port = port
73            self.user = user
74            self.password = password
75            self.readermode = readermode
76
77        def group(self, group_name):
78            if group_name == 'my.group':
79                return('', 3, 1, 3, group_name)
80            else:
81                raise nntplib.NNTPTemporaryError(
82                    'No such group: {}'.format(group_name))
83
84        def head(self, art_num):
85            if art_num not in (1, 2, 3):
86                raise nntplib.NNTPTemporaryError('Bad call to head')
87            lines = make_header(art_num)
88            info.number = art_num
89            info.message_id = '<msg{}@example.com'.format(art_num),
90            info.lines = lines
91            return ('', info)
92
93        def article(self, art_num):
94            if art_num not in (1, 2, 3):
95                raise nntplib.NNTPTemporaryError('Bad call to article')
96            if art_num == 2 and fail == 2:
97                raise nntplib.NNTPTemporaryError('Bad call to article')
98            lines = make_header(art_num)
99            lines.extend([b'', b'This is the message body'])
100            info.number = art_num
101            info.message_id = '<msg{}@example.com'.format(art_num),
102            info.lines = lines
103            return ('', info)
104
105        def quit(self):
106            pass
107
108    patcher = patch('nntplib.NNTP', NNTP)
109    return patcher
110
111
112def get_email_exception():
113    """Create a mock for email.parser.BytesParser to raise an exception."""
114
115    class BytesParser:
116        def __init__(self, factory, policy):
117            self.factory = factory
118            self.policy = policy
119
120        def parsebytes(self, msg_bytes):
121            raise MessageError('Bad message')
122
123    patcher = patch('email.parser.BytesParser', BytesParser)
124    return patcher
125
126
127class Test_gatenews(TestCase):
128    """Test gating messages from usenet."""
129
130    layer = ConfigLayer
131
132    def setUp(self):
133        self._command = CliRunner()
134        config.push('gatenews tests', """\
135        [nntp]
136        host: news.example.com
137        """)
138        self.addCleanup(config.pop, 'gatenews tests')
139        self.mlist = create_list('mylist@example.com')
140        self.mlist.linked_newsgroup = 'my.group'
141        self.mlist.usenet_watermark = 0
142        self.mlist.gateway_to_mail = True
143        # Create a second list without gateway for test coverage purposes.
144        create_list('otherlist@example.com')
145
146    def test_bad_nntp_connect(self):
147        mark = LogFileMark('mailman.fromusenet')
148        with get_nntplib_nntp(fail=1):
149            self._command.invoke(gatenews)
150        lines = mark.read().splitlines()
151        self.assertEqual(len(lines), 4)
152        self.assertTrue(lines[0].endswith('error opening connection '
153                                          'to nntp_host: news.example.com'))
154        self.assertEqual(lines[1], 'Bad call to NNTP')
155        self.assertTrue(lines[2].endswith('NNTP error for list '
156                                          'mylist@example.com:'))
157        self.assertEqual(lines[3], 'Bad call to NNTP')
158
159    def test_bad_group(self):
160        self.mlist.linked_newsgroup = 'other.group'
161        mark = LogFileMark('mailman.fromusenet')
162        with get_nntplib_nntp():
163            self._command.invoke(gatenews)
164        lines = mark.read().splitlines()
165        self.assertEqual(len(lines), 2)
166        self.assertTrue(lines[0].endswith('NNTP error for list '
167                                          'mylist@example.com:'))
168        self.assertEqual(lines[1], 'No such group: other.group')
169
170    def test_catchup_only(self):
171        self.mlist.usenet_watermark = None
172        mark = LogFileMark('mailman.fromusenet')
173        with get_nntplib_nntp():
174            self._command.invoke(gatenews)
175        lines = mark.read().splitlines()
176        self.assertEqual(self.mlist.usenet_watermark, 3)
177        self.assertEqual(len(lines), 3)
178        self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]'))
179        self.assertTrue(lines[1].endswith('mylist@example.com '
180                                          'caught up to article 3'))
181        self.assertTrue(lines[2].endswith('mylist@example.com watermark: 3'))
182
183    def test_up_to_date(self):
184        self.mlist.usenet_watermark = 3
185        mark = LogFileMark('mailman.fromusenet')
186        with get_nntplib_nntp():
187            self._command.invoke(gatenews)
188        lines = mark.read().splitlines()
189        self.assertEqual(self.mlist.usenet_watermark, 3)
190        self.assertEqual(len(lines), 3)
191        self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]'))
192        self.assertTrue(lines[1].endswith('nothing new for list '
193                                          'mylist@example.com'))
194        self.assertTrue(lines[2].endswith('mylist@example.com watermark: 3'))
195
196    def test_post_only_one_of_three(self):
197        mark = LogFileMark('mailman.fromusenet')
198        with get_nntplib_nntp():
199            self._command.invoke(gatenews)
200        lines = mark.read().splitlines()
201        self.assertEqual(self.mlist.usenet_watermark, 3)
202        self.assertEqual(len(lines), 4)
203        self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]'))
204        self.assertTrue(lines[1].endswith('gating mylist@example.com '
205                                          'articles [1..3]'))
206        self.assertTrue(lines[2].endswith('posted to list mylist@example.com:'
207                                          '       2'))
208        self.assertTrue(lines[3].endswith('mylist@example.com watermark: 3'))
209        items = get_queue_messages('in', expected_count=1)
210        msg = items[0].msg
211        msgdata = items[0].msgdata
212        self.assertTrue(msgdata.get('fromusenet', False))
213        self.assertEqual(msg.get('message-id', ''), '<msg2@example.com>')
214
215    def test_article_exception(self):
216        mark = LogFileMark('mailman.fromusenet')
217        with get_nntplib_nntp(fail=2):
218            self._command.invoke(gatenews)
219        lines = mark.read().splitlines()
220        self.assertEqual(len(lines), 5)
221        self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]'))
222        self.assertTrue(lines[1].endswith('gating mylist@example.com '
223                                          'articles [1..3]'))
224        self.assertTrue(lines[2].endswith('NNTP error for list '
225                                          'mylist@example.com:       2'))
226        self.assertEqual(lines[3], 'Bad call to article')
227        self.assertTrue(lines[4].endswith('mylist@example.com watermark: 3'))
228
229    def test_email_parser_exception(self):
230        mark = LogFileMark('mailman.fromusenet')
231        with get_email_exception():
232            with get_nntplib_nntp():
233                self._command.invoke(gatenews)
234        lines = mark.read().splitlines()
235        self.assertEqual(len(lines), 5)
236        self.assertTrue(lines[0].endswith('mylist@example.com: [1..3]'))
237        self.assertTrue(lines[1].endswith('gating mylist@example.com '
238                                          'articles [1..3]'))
239        self.assertTrue(lines[2].endswith('email package exception for '
240                                          'my.group:2'))
241        self.assertEqual(lines[3], 'Bad message')
242        self.assertTrue(lines[4].endswith('mylist@example.com watermark: 3'))
243