1import unittest
2import textwrap
3from test import support, mock_socket
4from test.support import socket_helper
5from test.support import warnings_helper
6import socket
7import io
8
9import warnings
10with warnings.catch_warnings():
11    warnings.simplefilter('ignore', DeprecationWarning)
12    import smtpd
13    import asyncore
14
15
16class DummyServer(smtpd.SMTPServer):
17    def __init__(self, *args, **kwargs):
18        smtpd.SMTPServer.__init__(self, *args, **kwargs)
19        self.messages = []
20        if self._decode_data:
21            self.return_status = 'return status'
22        else:
23            self.return_status = b'return status'
24
25    def process_message(self, peer, mailfrom, rcpttos, data, **kw):
26        self.messages.append((peer, mailfrom, rcpttos, data))
27        if data == self.return_status:
28            return '250 Okish'
29        if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
30            return '250 SMTPUTF8 message okish'
31
32
33class DummyDispatcherBroken(Exception):
34    pass
35
36
37class BrokenDummyServer(DummyServer):
38    def listen(self, num):
39        raise DummyDispatcherBroken()
40
41
42class SMTPDServerTest(unittest.TestCase):
43    def setUp(self):
44        smtpd.socket = asyncore.socket = mock_socket
45
46    def test_process_message_unimplemented(self):
47        server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
48                                  decode_data=True)
49        conn, addr = server.accept()
50        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
51
52        def write_line(line):
53            channel.socket.queue_recv(line)
54            channel.handle_read()
55
56        write_line(b'HELO example')
57        write_line(b'MAIL From:eggs@example')
58        write_line(b'RCPT To:spam@example')
59        write_line(b'DATA')
60        self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
61
62    def test_decode_data_and_enable_SMTPUTF8_raises(self):
63        self.assertRaises(
64            ValueError,
65            smtpd.SMTPServer,
66            (socket_helper.HOST, 0),
67            ('b', 0),
68            enable_SMTPUTF8=True,
69            decode_data=True)
70
71    def tearDown(self):
72        asyncore.close_all()
73        asyncore.socket = smtpd.socket = socket
74
75
76class DebuggingServerTest(unittest.TestCase):
77
78    def setUp(self):
79        smtpd.socket = asyncore.socket = mock_socket
80
81    def send_data(self, channel, data, enable_SMTPUTF8=False):
82        def write_line(line):
83            channel.socket.queue_recv(line)
84            channel.handle_read()
85        write_line(b'EHLO example')
86        if enable_SMTPUTF8:
87            write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
88        else:
89            write_line(b'MAIL From:eggs@example')
90        write_line(b'RCPT To:spam@example')
91        write_line(b'DATA')
92        write_line(data)
93        write_line(b'.')
94
95    def test_process_message_with_decode_data_true(self):
96        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
97                                       decode_data=True)
98        conn, addr = server.accept()
99        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
100        with support.captured_stdout() as s:
101            self.send_data(channel, b'From: test\n\nhello\n')
102        stdout = s.getvalue()
103        self.assertEqual(stdout, textwrap.dedent("""\
104             ---------- MESSAGE FOLLOWS ----------
105             From: test
106             X-Peer: peer-address
107
108             hello
109             ------------ END MESSAGE ------------
110             """))
111
112    def test_process_message_with_decode_data_false(self):
113        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
114        conn, addr = server.accept()
115        channel = smtpd.SMTPChannel(server, conn, addr)
116        with support.captured_stdout() as s:
117            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
118        stdout = s.getvalue()
119        self.assertEqual(stdout, textwrap.dedent("""\
120             ---------- MESSAGE FOLLOWS ----------
121             b'From: test'
122             b'X-Peer: peer-address'
123             b''
124             b'h\\xc3\\xa9llo\\xff'
125             ------------ END MESSAGE ------------
126             """))
127
128    def test_process_message_with_enable_SMTPUTF8_true(self):
129        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
130                                       enable_SMTPUTF8=True)
131        conn, addr = server.accept()
132        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
133        with support.captured_stdout() as s:
134            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
135        stdout = s.getvalue()
136        self.assertEqual(stdout, textwrap.dedent("""\
137             ---------- MESSAGE FOLLOWS ----------
138             b'From: test'
139             b'X-Peer: peer-address'
140             b''
141             b'h\\xc3\\xa9llo\\xff'
142             ------------ END MESSAGE ------------
143             """))
144
145    def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
146        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
147                                       enable_SMTPUTF8=True)
148        conn, addr = server.accept()
149        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
150        with support.captured_stdout() as s:
151            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
152                           enable_SMTPUTF8=True)
153        stdout = s.getvalue()
154        self.assertEqual(stdout, textwrap.dedent("""\
155             ---------- MESSAGE FOLLOWS ----------
156             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
157             b'From: test'
158             b'X-Peer: peer-address'
159             b''
160             b'h\\xc3\\xa9llo\\xff'
161             ------------ END MESSAGE ------------
162             """))
163
164    def tearDown(self):
165        asyncore.close_all()
166        asyncore.socket = smtpd.socket = socket
167
168
169class TestFamilyDetection(unittest.TestCase):
170    def setUp(self):
171        smtpd.socket = asyncore.socket = mock_socket
172
173    def tearDown(self):
174        asyncore.close_all()
175        asyncore.socket = smtpd.socket = socket
176
177    @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
178    def test_socket_uses_IPv6(self):
179        server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
180        self.assertEqual(server.socket.family, socket.AF_INET6)
181
182    def test_socket_uses_IPv4(self):
183        server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
184        self.assertEqual(server.socket.family, socket.AF_INET)
185
186
187class TestRcptOptionParsing(unittest.TestCase):
188    error_response = (b'555 RCPT TO parameters not recognized or not '
189                      b'implemented\r\n')
190
191    def setUp(self):
192        smtpd.socket = asyncore.socket = mock_socket
193        self.old_debugstream = smtpd.DEBUGSTREAM
194        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
195
196    def tearDown(self):
197        asyncore.close_all()
198        asyncore.socket = smtpd.socket = socket
199        smtpd.DEBUGSTREAM = self.old_debugstream
200
201    def write_line(self, channel, line):
202        channel.socket.queue_recv(line)
203        channel.handle_read()
204
205    def test_params_rejected(self):
206        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
207        conn, addr = server.accept()
208        channel = smtpd.SMTPChannel(server, conn, addr)
209        self.write_line(channel, b'EHLO example')
210        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
211        self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
212        self.assertEqual(channel.socket.last, self.error_response)
213
214    def test_nothing_accepted(self):
215        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
216        conn, addr = server.accept()
217        channel = smtpd.SMTPChannel(server, conn, addr)
218        self.write_line(channel, b'EHLO example')
219        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
220        self.write_line(channel, b'RCPT to: <foo@example.com>')
221        self.assertEqual(channel.socket.last, b'250 OK\r\n')
222
223
224class TestMailOptionParsing(unittest.TestCase):
225    error_response = (b'555 MAIL FROM parameters not recognized or not '
226                      b'implemented\r\n')
227
228    def setUp(self):
229        smtpd.socket = asyncore.socket = mock_socket
230        self.old_debugstream = smtpd.DEBUGSTREAM
231        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
232
233    def tearDown(self):
234        asyncore.close_all()
235        asyncore.socket = smtpd.socket = socket
236        smtpd.DEBUGSTREAM = self.old_debugstream
237
238    def write_line(self, channel, line):
239        channel.socket.queue_recv(line)
240        channel.handle_read()
241
242    def test_with_decode_data_true(self):
243        server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
244        conn, addr = server.accept()
245        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
246        self.write_line(channel, b'EHLO example')
247        for line in [
248            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
249            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
250            b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
251            b'MAIL from: <foo@example.com> size=20 body=8bitmime',
252        ]:
253            self.write_line(channel, line)
254            self.assertEqual(channel.socket.last, self.error_response)
255        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
256        self.assertEqual(channel.socket.last, b'250 OK\r\n')
257
258    def test_with_decode_data_false(self):
259        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
260        conn, addr = server.accept()
261        channel = smtpd.SMTPChannel(server, conn, addr)
262        self.write_line(channel, b'EHLO example')
263        for line in [
264            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
265            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
266        ]:
267            self.write_line(channel, line)
268            self.assertEqual(channel.socket.last, self.error_response)
269        self.write_line(
270            channel,
271            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
272        self.assertEqual(
273            channel.socket.last,
274            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
275        self.write_line(
276            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
277        self.assertEqual(channel.socket.last, b'250 OK\r\n')
278
279    def test_with_enable_smtputf8_true(self):
280        server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
281        conn, addr = server.accept()
282        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
283        self.write_line(channel, b'EHLO example')
284        self.write_line(
285            channel,
286            b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')
287        self.assertEqual(channel.socket.last, b'250 OK\r\n')
288
289
290class SMTPDChannelTest(unittest.TestCase):
291    def setUp(self):
292        smtpd.socket = asyncore.socket = mock_socket
293        self.old_debugstream = smtpd.DEBUGSTREAM
294        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
295        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
296                                  decode_data=True)
297        conn, addr = self.server.accept()
298        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
299                                         decode_data=True)
300
301    def tearDown(self):
302        asyncore.close_all()
303        asyncore.socket = smtpd.socket = socket
304        smtpd.DEBUGSTREAM = self.old_debugstream
305
306    def write_line(self, line):
307        self.channel.socket.queue_recv(line)
308        self.channel.handle_read()
309
310    def test_broken_connect(self):
311        self.assertRaises(
312            DummyDispatcherBroken, BrokenDummyServer,
313            (socket_helper.HOST, 0), ('b', 0), decode_data=True)
314
315    def test_decode_data_and_enable_SMTPUTF8_raises(self):
316        self.assertRaises(
317            ValueError, smtpd.SMTPChannel,
318            self.server, self.channel.conn, self.channel.addr,
319            enable_SMTPUTF8=True, decode_data=True)
320
321    def test_server_accept(self):
322        self.server.handle_accept()
323
324    def test_missing_data(self):
325        self.write_line(b'')
326        self.assertEqual(self.channel.socket.last,
327                         b'500 Error: bad syntax\r\n')
328
329    def test_EHLO(self):
330        self.write_line(b'EHLO example')
331        self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
332
333    def test_EHLO_bad_syntax(self):
334        self.write_line(b'EHLO')
335        self.assertEqual(self.channel.socket.last,
336                         b'501 Syntax: EHLO hostname\r\n')
337
338    def test_EHLO_duplicate(self):
339        self.write_line(b'EHLO example')
340        self.write_line(b'EHLO example')
341        self.assertEqual(self.channel.socket.last,
342                         b'503 Duplicate HELO/EHLO\r\n')
343
344    def test_EHLO_HELO_duplicate(self):
345        self.write_line(b'EHLO example')
346        self.write_line(b'HELO example')
347        self.assertEqual(self.channel.socket.last,
348                         b'503 Duplicate HELO/EHLO\r\n')
349
350    def test_HELO(self):
351        name = smtpd.socket.getfqdn()
352        self.write_line(b'HELO example')
353        self.assertEqual(self.channel.socket.last,
354                         '250 {}\r\n'.format(name).encode('ascii'))
355
356    def test_HELO_EHLO_duplicate(self):
357        self.write_line(b'HELO example')
358        self.write_line(b'EHLO example')
359        self.assertEqual(self.channel.socket.last,
360                         b'503 Duplicate HELO/EHLO\r\n')
361
362    def test_HELP(self):
363        self.write_line(b'HELP')
364        self.assertEqual(self.channel.socket.last,
365                         b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
366                         b'DATA RSET NOOP QUIT VRFY\r\n')
367
368    def test_HELP_command(self):
369        self.write_line(b'HELP MAIL')
370        self.assertEqual(self.channel.socket.last,
371                         b'250 Syntax: MAIL FROM: <address>\r\n')
372
373    def test_HELP_command_unknown(self):
374        self.write_line(b'HELP SPAM')
375        self.assertEqual(self.channel.socket.last,
376                         b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
377                         b'DATA RSET NOOP QUIT VRFY\r\n')
378
379    def test_HELO_bad_syntax(self):
380        self.write_line(b'HELO')
381        self.assertEqual(self.channel.socket.last,
382                         b'501 Syntax: HELO hostname\r\n')
383
384    def test_HELO_duplicate(self):
385        self.write_line(b'HELO example')
386        self.write_line(b'HELO example')
387        self.assertEqual(self.channel.socket.last,
388                         b'503 Duplicate HELO/EHLO\r\n')
389
390    def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
391        self.extended_smtp = False
392        self.write_line(b'HELO example')
393        self.write_line(b'MAIL from:<foo@example.com> SIZE=1234')
394        self.assertEqual(self.channel.socket.last,
395                         b'501 Syntax: MAIL FROM: <address>\r\n')
396
397    def test_MAIL_allows_space_after_colon(self):
398        self.write_line(b'HELO example')
399        self.write_line(b'MAIL from:   <foo@example.com>')
400        self.assertEqual(self.channel.socket.last,
401                         b'250 OK\r\n')
402
403    def test_extended_MAIL_allows_space_after_colon(self):
404        self.write_line(b'EHLO example')
405        self.write_line(b'MAIL from:   <foo@example.com> size=20')
406        self.assertEqual(self.channel.socket.last,
407                         b'250 OK\r\n')
408
409    def test_NOOP(self):
410        self.write_line(b'NOOP')
411        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
412
413    def test_HELO_NOOP(self):
414        self.write_line(b'HELO example')
415        self.write_line(b'NOOP')
416        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
417
418    def test_NOOP_bad_syntax(self):
419        self.write_line(b'NOOP hi')
420        self.assertEqual(self.channel.socket.last,
421                         b'501 Syntax: NOOP\r\n')
422
423    def test_QUIT(self):
424        self.write_line(b'QUIT')
425        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
426
427    def test_HELO_QUIT(self):
428        self.write_line(b'HELO example')
429        self.write_line(b'QUIT')
430        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
431
432    def test_QUIT_arg_ignored(self):
433        self.write_line(b'QUIT bye bye')
434        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
435
436    def test_bad_state(self):
437        self.channel.smtp_state = 'BAD STATE'
438        self.write_line(b'HELO example')
439        self.assertEqual(self.channel.socket.last,
440                         b'451 Internal confusion\r\n')
441
442    def test_command_too_long(self):
443        self.write_line(b'HELO example')
444        self.write_line(b'MAIL from: ' +
445                        b'a' * self.channel.command_size_limit +
446                        b'@example')
447        self.assertEqual(self.channel.socket.last,
448                         b'500 Error: line too long\r\n')
449
450    def test_MAIL_command_limit_extended_with_SIZE(self):
451        self.write_line(b'EHLO example')
452        fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
453        self.write_line(b'MAIL from:<' +
454                        b'a' * fill_len +
455                        b'@example> SIZE=1234')
456        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
457
458        self.write_line(b'MAIL from:<' +
459                        b'a' * (fill_len + 26) +
460                        b'@example> SIZE=1234')
461        self.assertEqual(self.channel.socket.last,
462                         b'500 Error: line too long\r\n')
463
464    def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
465        self.write_line(b'EHLO example')
466        self.write_line(
467            b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
468        self.assertEqual(self.channel.socket.last[0:1], b'5')
469
470    def test_data_longer_than_default_data_size_limit(self):
471        # Hack the default so we don't have to generate so much data.
472        self.channel.data_size_limit = 1048
473        self.write_line(b'HELO example')
474        self.write_line(b'MAIL From:eggs@example')
475        self.write_line(b'RCPT To:spam@example')
476        self.write_line(b'DATA')
477        self.write_line(b'A' * self.channel.data_size_limit +
478                        b'A\r\n.')
479        self.assertEqual(self.channel.socket.last,
480                         b'552 Error: Too much mail data\r\n')
481
482    def test_MAIL_size_parameter(self):
483        self.write_line(b'EHLO example')
484        self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
485        self.assertEqual(self.channel.socket.last,
486                         b'250 OK\r\n')
487
488    def test_MAIL_invalid_size_parameter(self):
489        self.write_line(b'EHLO example')
490        self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
491        self.assertEqual(self.channel.socket.last,
492            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
493
494    def test_MAIL_RCPT_unknown_parameters(self):
495        self.write_line(b'EHLO example')
496        self.write_line(b'MAIL FROM:<eggs@example> ham=green')
497        self.assertEqual(self.channel.socket.last,
498            b'555 MAIL FROM parameters not recognized or not implemented\r\n')
499
500        self.write_line(b'MAIL FROM:<eggs@example>')
501        self.write_line(b'RCPT TO:<eggs@example> ham=green')
502        self.assertEqual(self.channel.socket.last,
503            b'555 RCPT TO parameters not recognized or not implemented\r\n')
504
505    def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
506        self.channel.data_size_limit = 1048
507        self.write_line(b'EHLO example')
508        self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
509        self.assertEqual(self.channel.socket.last,
510            b'552 Error: message size exceeds fixed maximum message size\r\n')
511
512    def test_need_MAIL(self):
513        self.write_line(b'HELO example')
514        self.write_line(b'RCPT to:spam@example')
515        self.assertEqual(self.channel.socket.last,
516            b'503 Error: need MAIL command\r\n')
517
518    def test_MAIL_syntax_HELO(self):
519        self.write_line(b'HELO example')
520        self.write_line(b'MAIL from eggs@example')
521        self.assertEqual(self.channel.socket.last,
522            b'501 Syntax: MAIL FROM: <address>\r\n')
523
524    def test_MAIL_syntax_EHLO(self):
525        self.write_line(b'EHLO example')
526        self.write_line(b'MAIL from eggs@example')
527        self.assertEqual(self.channel.socket.last,
528            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
529
530    def test_MAIL_missing_address(self):
531        self.write_line(b'HELO example')
532        self.write_line(b'MAIL from:')
533        self.assertEqual(self.channel.socket.last,
534            b'501 Syntax: MAIL FROM: <address>\r\n')
535
536    def test_MAIL_chevrons(self):
537        self.write_line(b'HELO example')
538        self.write_line(b'MAIL from:<eggs@example>')
539        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
540
541    def test_MAIL_empty_chevrons(self):
542        self.write_line(b'EHLO example')
543        self.write_line(b'MAIL from:<>')
544        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
545
546    def test_MAIL_quoted_localpart(self):
547        self.write_line(b'EHLO example')
548        self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
549        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
550        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
551
552    def test_MAIL_quoted_localpart_no_angles(self):
553        self.write_line(b'EHLO example')
554        self.write_line(b'MAIL from: "Fred Blogs"@example.com')
555        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
556        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
557
558    def test_MAIL_quoted_localpart_with_size(self):
559        self.write_line(b'EHLO example')
560        self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
561        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
562        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
563
564    def test_MAIL_quoted_localpart_with_size_no_angles(self):
565        self.write_line(b'EHLO example')
566        self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
567        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
568        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
569
570    def test_nested_MAIL(self):
571        self.write_line(b'HELO example')
572        self.write_line(b'MAIL from:eggs@example')
573        self.write_line(b'MAIL from:spam@example')
574        self.assertEqual(self.channel.socket.last,
575            b'503 Error: nested MAIL command\r\n')
576
577    def test_VRFY(self):
578        self.write_line(b'VRFY eggs@example')
579        self.assertEqual(self.channel.socket.last,
580            b'252 Cannot VRFY user, but will accept message and attempt ' + \
581            b'delivery\r\n')
582
583    def test_VRFY_syntax(self):
584        self.write_line(b'VRFY')
585        self.assertEqual(self.channel.socket.last,
586            b'501 Syntax: VRFY <address>\r\n')
587
588    def test_EXPN_not_implemented(self):
589        self.write_line(b'EXPN')
590        self.assertEqual(self.channel.socket.last,
591            b'502 EXPN not implemented\r\n')
592
593    def test_no_HELO_MAIL(self):
594        self.write_line(b'MAIL from:<foo@example.com>')
595        self.assertEqual(self.channel.socket.last,
596                         b'503 Error: send HELO first\r\n')
597
598    def test_need_RCPT(self):
599        self.write_line(b'HELO example')
600        self.write_line(b'MAIL From:eggs@example')
601        self.write_line(b'DATA')
602        self.assertEqual(self.channel.socket.last,
603            b'503 Error: need RCPT command\r\n')
604
605    def test_RCPT_syntax_HELO(self):
606        self.write_line(b'HELO example')
607        self.write_line(b'MAIL From: eggs@example')
608        self.write_line(b'RCPT to eggs@example')
609        self.assertEqual(self.channel.socket.last,
610            b'501 Syntax: RCPT TO: <address>\r\n')
611
612    def test_RCPT_syntax_EHLO(self):
613        self.write_line(b'EHLO example')
614        self.write_line(b'MAIL From: eggs@example')
615        self.write_line(b'RCPT to eggs@example')
616        self.assertEqual(self.channel.socket.last,
617            b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
618
619    def test_RCPT_lowercase_to_OK(self):
620        self.write_line(b'HELO example')
621        self.write_line(b'MAIL From: eggs@example')
622        self.write_line(b'RCPT to: <eggs@example>')
623        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
624
625    def test_no_HELO_RCPT(self):
626        self.write_line(b'RCPT to eggs@example')
627        self.assertEqual(self.channel.socket.last,
628                         b'503 Error: send HELO first\r\n')
629
630    def test_data_dialog(self):
631        self.write_line(b'HELO example')
632        self.write_line(b'MAIL From:eggs@example')
633        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
634        self.write_line(b'RCPT To:spam@example')
635        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
636
637        self.write_line(b'DATA')
638        self.assertEqual(self.channel.socket.last,
639            b'354 End data with <CR><LF>.<CR><LF>\r\n')
640        self.write_line(b'data\r\nmore\r\n.')
641        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
642        self.assertEqual(self.server.messages,
643            [(('peer-address', 'peer-port'),
644              'eggs@example',
645              ['spam@example'],
646              'data\nmore')])
647
648    def test_DATA_syntax(self):
649        self.write_line(b'HELO example')
650        self.write_line(b'MAIL From:eggs@example')
651        self.write_line(b'RCPT To:spam@example')
652        self.write_line(b'DATA spam')
653        self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
654
655    def test_no_HELO_DATA(self):
656        self.write_line(b'DATA spam')
657        self.assertEqual(self.channel.socket.last,
658                         b'503 Error: send HELO first\r\n')
659
660    def test_data_transparency_section_4_5_2(self):
661        self.write_line(b'HELO example')
662        self.write_line(b'MAIL From:eggs@example')
663        self.write_line(b'RCPT To:spam@example')
664        self.write_line(b'DATA')
665        self.write_line(b'..\r\n.\r\n')
666        self.assertEqual(self.channel.received_data, '.')
667
668    def test_multiple_RCPT(self):
669        self.write_line(b'HELO example')
670        self.write_line(b'MAIL From:eggs@example')
671        self.write_line(b'RCPT To:spam@example')
672        self.write_line(b'RCPT To:ham@example')
673        self.write_line(b'DATA')
674        self.write_line(b'data\r\n.')
675        self.assertEqual(self.server.messages,
676            [(('peer-address', 'peer-port'),
677              'eggs@example',
678              ['spam@example','ham@example'],
679              'data')])
680
681    def test_manual_status(self):
682        # checks that the Channel is able to return a custom status message
683        self.write_line(b'HELO example')
684        self.write_line(b'MAIL From:eggs@example')
685        self.write_line(b'RCPT To:spam@example')
686        self.write_line(b'DATA')
687        self.write_line(b'return status\r\n.')
688        self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
689
690    def test_RSET(self):
691        self.write_line(b'HELO example')
692        self.write_line(b'MAIL From:eggs@example')
693        self.write_line(b'RCPT To:spam@example')
694        self.write_line(b'RSET')
695        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
696        self.write_line(b'MAIL From:foo@example')
697        self.write_line(b'RCPT To:eggs@example')
698        self.write_line(b'DATA')
699        self.write_line(b'data\r\n.')
700        self.assertEqual(self.server.messages,
701            [(('peer-address', 'peer-port'),
702               'foo@example',
703               ['eggs@example'],
704               'data')])
705
706    def test_HELO_RSET(self):
707        self.write_line(b'HELO example')
708        self.write_line(b'RSET')
709        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
710
711    def test_RSET_syntax(self):
712        self.write_line(b'RSET hi')
713        self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
714
715    def test_unknown_command(self):
716        self.write_line(b'UNKNOWN_CMD')
717        self.assertEqual(self.channel.socket.last,
718                         b'500 Error: command "UNKNOWN_CMD" not ' + \
719                         b'recognized\r\n')
720
721    def test_attribute_deprecations(self):
722        with warnings_helper.check_warnings(('', DeprecationWarning)):
723            spam = self.channel._SMTPChannel__server
724        with warnings_helper.check_warnings(('', DeprecationWarning)):
725            self.channel._SMTPChannel__server = 'spam'
726        with warnings_helper.check_warnings(('', DeprecationWarning)):
727            spam = self.channel._SMTPChannel__line
728        with warnings_helper.check_warnings(('', DeprecationWarning)):
729            self.channel._SMTPChannel__line = 'spam'
730        with warnings_helper.check_warnings(('', DeprecationWarning)):
731            spam = self.channel._SMTPChannel__state
732        with warnings_helper.check_warnings(('', DeprecationWarning)):
733            self.channel._SMTPChannel__state = 'spam'
734        with warnings_helper.check_warnings(('', DeprecationWarning)):
735            spam = self.channel._SMTPChannel__greeting
736        with warnings_helper.check_warnings(('', DeprecationWarning)):
737            self.channel._SMTPChannel__greeting = 'spam'
738        with warnings_helper.check_warnings(('', DeprecationWarning)):
739            spam = self.channel._SMTPChannel__mailfrom
740        with warnings_helper.check_warnings(('', DeprecationWarning)):
741            self.channel._SMTPChannel__mailfrom = 'spam'
742        with warnings_helper.check_warnings(('', DeprecationWarning)):
743            spam = self.channel._SMTPChannel__rcpttos
744        with warnings_helper.check_warnings(('', DeprecationWarning)):
745            self.channel._SMTPChannel__rcpttos = 'spam'
746        with warnings_helper.check_warnings(('', DeprecationWarning)):
747            spam = self.channel._SMTPChannel__data
748        with warnings_helper.check_warnings(('', DeprecationWarning)):
749            self.channel._SMTPChannel__data = 'spam'
750        with warnings_helper.check_warnings(('', DeprecationWarning)):
751            spam = self.channel._SMTPChannel__fqdn
752        with warnings_helper.check_warnings(('', DeprecationWarning)):
753            self.channel._SMTPChannel__fqdn = 'spam'
754        with warnings_helper.check_warnings(('', DeprecationWarning)):
755            spam = self.channel._SMTPChannel__peer
756        with warnings_helper.check_warnings(('', DeprecationWarning)):
757            self.channel._SMTPChannel__peer = 'spam'
758        with warnings_helper.check_warnings(('', DeprecationWarning)):
759            spam = self.channel._SMTPChannel__conn
760        with warnings_helper.check_warnings(('', DeprecationWarning)):
761            self.channel._SMTPChannel__conn = 'spam'
762        with warnings_helper.check_warnings(('', DeprecationWarning)):
763            spam = self.channel._SMTPChannel__addr
764        with warnings_helper.check_warnings(('', DeprecationWarning)):
765            self.channel._SMTPChannel__addr = 'spam'
766
767@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
768class SMTPDChannelIPv6Test(SMTPDChannelTest):
769    def setUp(self):
770        smtpd.socket = asyncore.socket = mock_socket
771        self.old_debugstream = smtpd.DEBUGSTREAM
772        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
773        self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
774                                  decode_data=True)
775        conn, addr = self.server.accept()
776        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
777                                         decode_data=True)
778
779class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
780
781    def setUp(self):
782        smtpd.socket = asyncore.socket = mock_socket
783        self.old_debugstream = smtpd.DEBUGSTREAM
784        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
785        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
786                                  decode_data=True)
787        conn, addr = self.server.accept()
788        # Set DATA size limit to 32 bytes for easy testing
789        self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
790                                         decode_data=True)
791
792    def tearDown(self):
793        asyncore.close_all()
794        asyncore.socket = smtpd.socket = socket
795        smtpd.DEBUGSTREAM = self.old_debugstream
796
797    def write_line(self, line):
798        self.channel.socket.queue_recv(line)
799        self.channel.handle_read()
800
801    def test_data_limit_dialog(self):
802        self.write_line(b'HELO example')
803        self.write_line(b'MAIL From:eggs@example')
804        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
805        self.write_line(b'RCPT To:spam@example')
806        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
807
808        self.write_line(b'DATA')
809        self.assertEqual(self.channel.socket.last,
810            b'354 End data with <CR><LF>.<CR><LF>\r\n')
811        self.write_line(b'data\r\nmore\r\n.')
812        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
813        self.assertEqual(self.server.messages,
814            [(('peer-address', 'peer-port'),
815              'eggs@example',
816              ['spam@example'],
817              'data\nmore')])
818
819    def test_data_limit_dialog_too_much_data(self):
820        self.write_line(b'HELO example')
821        self.write_line(b'MAIL From:eggs@example')
822        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
823        self.write_line(b'RCPT To:spam@example')
824        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
825
826        self.write_line(b'DATA')
827        self.assertEqual(self.channel.socket.last,
828            b'354 End data with <CR><LF>.<CR><LF>\r\n')
829        self.write_line(b'This message is longer than 32 bytes\r\n.')
830        self.assertEqual(self.channel.socket.last,
831                         b'552 Error: Too much mail data\r\n')
832
833
834class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
835
836    def setUp(self):
837        smtpd.socket = asyncore.socket = mock_socket
838        self.old_debugstream = smtpd.DEBUGSTREAM
839        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
840        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
841        conn, addr = self.server.accept()
842        self.channel = smtpd.SMTPChannel(self.server, conn, addr)
843
844    def tearDown(self):
845        asyncore.close_all()
846        asyncore.socket = smtpd.socket = socket
847        smtpd.DEBUGSTREAM = self.old_debugstream
848
849    def write_line(self, line):
850        self.channel.socket.queue_recv(line)
851        self.channel.handle_read()
852
853    def test_ascii_data(self):
854        self.write_line(b'HELO example')
855        self.write_line(b'MAIL From:eggs@example')
856        self.write_line(b'RCPT To:spam@example')
857        self.write_line(b'DATA')
858        self.write_line(b'plain ascii text')
859        self.write_line(b'.')
860        self.assertEqual(self.channel.received_data, b'plain ascii text')
861
862    def test_utf8_data(self):
863        self.write_line(b'HELO example')
864        self.write_line(b'MAIL From:eggs@example')
865        self.write_line(b'RCPT To:spam@example')
866        self.write_line(b'DATA')
867        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
868        self.write_line(b'and some plain ascii')
869        self.write_line(b'.')
870        self.assertEqual(
871            self.channel.received_data,
872            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
873                b'and some plain ascii')
874
875
876class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
877
878    def setUp(self):
879        smtpd.socket = asyncore.socket = mock_socket
880        self.old_debugstream = smtpd.DEBUGSTREAM
881        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
882        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
883                                  decode_data=True)
884        conn, addr = self.server.accept()
885        # Set decode_data to True
886        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
887                decode_data=True)
888
889    def tearDown(self):
890        asyncore.close_all()
891        asyncore.socket = smtpd.socket = socket
892        smtpd.DEBUGSTREAM = self.old_debugstream
893
894    def write_line(self, line):
895        self.channel.socket.queue_recv(line)
896        self.channel.handle_read()
897
898    def test_ascii_data(self):
899        self.write_line(b'HELO example')
900        self.write_line(b'MAIL From:eggs@example')
901        self.write_line(b'RCPT To:spam@example')
902        self.write_line(b'DATA')
903        self.write_line(b'plain ascii text')
904        self.write_line(b'.')
905        self.assertEqual(self.channel.received_data, 'plain ascii text')
906
907    def test_utf8_data(self):
908        self.write_line(b'HELO example')
909        self.write_line(b'MAIL From:eggs@example')
910        self.write_line(b'RCPT To:spam@example')
911        self.write_line(b'DATA')
912        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
913        self.write_line(b'and some plain ascii')
914        self.write_line(b'.')
915        self.assertEqual(
916            self.channel.received_data,
917            'utf8 enriched text: żźć\nand some plain ascii')
918
919
920class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
921    def setUp(self):
922        smtpd.socket = asyncore.socket = mock_socket
923        self.old_debugstream = smtpd.DEBUGSTREAM
924        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
925        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
926                                  enable_SMTPUTF8=True)
927        conn, addr = self.server.accept()
928        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
929                                         enable_SMTPUTF8=True)
930
931    def tearDown(self):
932        asyncore.close_all()
933        asyncore.socket = smtpd.socket = socket
934        smtpd.DEBUGSTREAM = self.old_debugstream
935
936    def write_line(self, line):
937        self.channel.socket.queue_recv(line)
938        self.channel.handle_read()
939
940    def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
941        self.write_line(b'EHLO example')
942        self.write_line(
943            'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
944                'utf-8')
945        )
946        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
947
948    def test_process_smtputf8_message(self):
949        self.write_line(b'EHLO example')
950        for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
951            self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
952            self.assertEqual(self.channel.socket.last[0:3], b'250')
953            self.write_line(b'rcpt to:<b@example.com>')
954            self.assertEqual(self.channel.socket.last[0:3], b'250')
955            self.write_line(b'data')
956            self.assertEqual(self.channel.socket.last[0:3], b'354')
957            self.write_line(b'c\r\n.')
958            if mail_parameters == b'':
959                self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
960            else:
961                self.assertEqual(self.channel.socket.last,
962                                 b'250 SMTPUTF8 message okish\r\n')
963
964    def test_utf8_data(self):
965        self.write_line(b'EHLO example')
966        self.write_line(
967            'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
968        self.assertEqual(self.channel.socket.last[0:3], b'250')
969        self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
970        self.assertEqual(self.channel.socket.last[0:3], b'250')
971        self.write_line(b'DATA')
972        self.assertEqual(self.channel.socket.last[0:3], b'354')
973        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
974        self.write_line(b'.')
975        self.assertEqual(
976            self.channel.received_data,
977            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
978
979    def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
980        self.write_line(b'ehlo example')
981        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
982        self.write_line(b'MAIL from:<' +
983                        b'a' * (fill_len + 1) +
984                        b'@example>')
985        self.assertEqual(self.channel.socket.last,
986                         b'500 Error: line too long\r\n')
987        self.write_line(b'MAIL from:<' +
988                        b'a' * fill_len +
989                        b'@example>')
990        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
991
992    def test_multiple_emails_with_extended_command_length(self):
993        self.write_line(b'ehlo example')
994        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
995        for char in [b'a', b'b', b'c']:
996            self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
997            self.assertEqual(self.channel.socket.last[0:3], b'500')
998            self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
999            self.assertEqual(self.channel.socket.last[0:3], b'250')
1000            self.write_line(b'rcpt to:<hans@example.com>')
1001            self.assertEqual(self.channel.socket.last[0:3], b'250')
1002            self.write_line(b'data')
1003            self.assertEqual(self.channel.socket.last[0:3], b'354')
1004            self.write_line(b'test\r\n.')
1005            self.assertEqual(self.channel.socket.last[0:3], b'250')
1006
1007
1008class MiscTestCase(unittest.TestCase):
1009    def test__all__(self):
1010        not_exported = {
1011            "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
1012            "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
1013        }
1014        support.check__all__(self, smtpd, not_exported=not_exported)
1015
1016
1017if __name__ == "__main__":
1018    unittest.main()
1019